How to keep up to date with all the covid data automatically
Keeping up with the ups and downs of a pandemic is not an easy task as a data scientist
This text explains the code below
import pandas as pd
import pycountry
covid_data_url = 'https://covid.ourworldindata.org/data/owid-covid-data.csv'
stringency_api_url = 'https://covidtrackerapi.bsg.ox.ac.uk/api/stringency'
def covid():
import pandas as pd
df = pd.read_csv(covid_data_url)
df['date'] = pd.to_datetime(df['date'])
df = df.drop(columns=['continent','stringency_index','location','tests_units']).pivot(index='date',columns=['iso_code']).sort_values(by='date').select_dtypes(['number']).apply(pd.to_numeric, errors='coerce').drop_duplicates().dropna(axis=1,how='all').head(-1)
#todo make all columns floats
df.columns.names = ['variable', 'iso_code']
start_date = df.index.min()
now_date = df.index.max()
import datetime as dt
import requests as req
now_date = str(now_date.strftime('%Y-%m-%d')).replace("'", "")
start_date = str(start_date.strftime('%Y-%m-%d')).replace("'", "")
now_date1 = str(dt.datetime.now().strftime('%Y-%m-%d')).replace("'", "")
stringency_api = f"{stringency_api_url}/date-range/{start_date}/{now_date}"
stringency_df = pd.DataFrame.from_dict(
req.get(stringency_api).json()['data'],
orient='index')
def extract_stringency(stringency_series, start_date='2020-01-02', end_date = dt.datetime.now().strftime('%Y-%m-%d')):
try:
df = pd.DataFrame().from_dict(
{i['date_value']: [i['stringency']] for i in stringency_series.values if not pd.isna(i)}
, orient='index').rename(columns={0: stringency_series.name})
df = df.set_index(pd.DatetimeIndex(pd.to_datetime(df.index.values, format='%Y-%m-%d'),
name='day')).groupby(
level=0).mean() # in case of double values per day, take their mean
df = df.reindex(pd.date_range(start=start_date,
end= end_date))
df.interpolate(method='linear', inplace=True)
return df
except:
pass
def stringency_data_df(stringency_df):
data_df = pd.DataFrame([])
for iso_alpha in stringency_df.columns:
data_df = pd.concat([
data_df,
extract_stringency(stringency_df[iso_alpha], end_date=df.index.max().strftime('%Y-%m-%d'))
], axis=1)
return data_df
stringency = pd.concat([stringency_data_df(stringency_df)],keys = ['stringency'],names=['iso_code'],axis=1)
df = pd.concat([df,stringency],axis=1)
return df
from sklearn.preprocessing import QuantileTransformer
def covid_map_df(covid_data):
"""
covid map contains:
name of the country
alpha_3
"""
qt = QuantileTransformer(n_quantiles=150)
pd.DataFrame(covid_data.loc[covid_data.index.max()]).columns#pivot(index = 'iso_code',columns='variable')
covid_map = pd.DataFrame(covid_data.loc[covid_data.index.max()]).unstack(level=0)#.dropna()
covid_map.columns = covid_map.columns.get_level_values(1).values
for col in covid_map.filter(regex="_per").columns:
new_col_name = str(col + '_relative_change')
covid_map[new_col_name] = qt.fit_transform(covid_map[col].values.reshape(-1, 1))
covid_map.index.name = 'alpha_3'
covid_map.reset_index(inplace=True)
# not every iso_code has a full name via pycountry
covid_map['name'] = covid_map['alpha_3'].apply(lambda cd: pycountry.countries.get(alpha_3=cd).name if pycountry.countries.get(alpha_3=cd) else cd)
return covid_map
cols_dd = list(set(chloropleth_df.columns.tolist()) - {'iso_code','location','tests performed','tests_units'})
visible = np.array(cols_dd)
df = chloropleth_df
# define traces and buttons at once
traces = []
buttons = []
for value in cols_dd:
traces.append(go.Choropleth(
locations=df['iso_code'], # Spatial coordinates
z=df[value].astype(float), # Data to be color-coded
colorbar_title=value,
visible= True if value==cols_dd[0] else False))
buttons.append(dict(label=value,
method="update",
args=[{"visible":list(visible==value)},
{"title":f"<b>{value}</b>"}]))
updatemenus = [{"active":0,
"buttons":buttons,
}]
# Show figure
fig = go.Figure(data=traces,
layout=dict(updatemenus=updatemenus))
# This is in order to get the first title displayed correctly
first_title = cols_dd[0]
fig.update_layout(title=f"<b>{first_title}</b>",title_x=0.5)
fig.show()
We want to store all this data in a central database, to simulate this for development we will run a local server. But in production switch the database url for prod url.
!pip install flask_sqlalchemy sqlalchemy-media psycopg2-binary --quiet
# install
!apt install postgresql postgresql-contrib &>log
!service postgresql start
!sudo -u postgres psql -c "CREATE USER root WITH SUPERUSER"
# set connection
%load_ext sql
%config SqlMagic.feedback=False
%config SqlMagic.autopandas=True
%sql postgresql+psycopg2://@/postgres
In production switch this to the appropriate format for your database
from sqlalchemy import create_engine
local_engine = create_engine('postgresql+psycopg2://@/postgres')
Then we need to store the data, however since the dataframe has
def to_sql(df, name, con, if_exists='fail',chunksize=3276):
if not df.index.name: df.index.name = 'idx'
if not df.columns.name: df.columns.name = 'col'
df.rename_axis(index=str.lower).unstack().rename_axis(index=str.lower).reset_index().set_index(
df.index.name.lower()).sort_values(
by=[df.index.name.lower()] + [col.lower() for col in list(df.columns.names)]).to_sql(name, con=con,
if_exists=if_exists,
method='multi',
chunksize=chunksize, index=True)
# read SQL
def read_sql(name: str, con: sa.engine.base.Engine, column_filter=None, row_filter=None) -> pd.DataFrame:
"""
:name: name of the the in the db
:con: sqlalchemy connection engine
:column_filter: dict in format {0:['value_0','value_1'],1:['value']}, where 0 refers to the level of the multiindex column
:row_filter: dict if format {'from':'2010-01-01','to':'2011-01-01'} or {'last':'10Q'}
"""
if not con.has_table(name): # check whether the table exists
raise Exception(f"Table {name} does not exist")
else:
type_dict = sa.inspect(con).get_columns(name)
index_col = [
col['name'] for col in type_dict if
any(datetype in col['type'].__str__() for datetype in ['TIMESTAMP','DATE'])
][0]
stack_cols = [col['name'] for col in type_dict if 'TEXT' in col['type'].__str__()]
value_col = \
[col['name'] for col in type_dict if ('DOUBLE' in col['type'].__str__() or 'INT' in col['type'].__str__())][
0]
params = tuple()
query = f"SELECT * FROM {name}" # base query
if column_filter: # select which column to fetch from db
query += f" WHERE "
query += " AND ".join([f"{stack_cols[int(key)]} IN ({','.join(['%s' for _ in value])})" for key, value in
column_filter.items()])
params = tuple([value for value in chain(*column_filter.values())])
if row_filter: # select between which dates to fetch
if not column_filter:
query += ' WHERE '
else:
query += ' AND '
if 'from' in row_filter.keys() and not 'last' in row_filter.keys():
query += f" {index_col} >= %s"
params += tuple([dt.datetime.fromisoformat(row_filter['from'])])
if 'to' in row_filter.keys() and not 'last' in row_filter.keys():
if 'from' in row_filter.keys(): query += ' AND '
query += f" {index_col} <= %s"
params += tuple([dt.datetime.fromisoformat(row_filter['to'])])
if 'last' in row_filter.keys() and not ('to' in row_filter.keys() or 'from' in row_filter.keys()):
query += f" {index_col} >= %s"
max_date = con.engine.execute(f'SELECT MAX({index_col}) FROM {name}').fetchone()[0]
params += tuple([(max_date - pd.tseries.frequencies.to_offset(row_filter['last'])).to_pydatetime()])
return pd.read_sql_query(query,
con=con,
index_col=index_col,
params=params
).pivot(columns=stack_cols, values=value_col)
def get_last_source_date(table_name:str,con)->dt.datetime:
import sqlalchemy as sa
type_dict = sa.inspect(con).get_columns(table_name)
index_col = [
col['name'] for col in type_dict if
any(datetype in col['type'].__str__() for datetype in ['TIMESTAMP','DATE'])
][0]
last_source_date = con.engine.execute(f'SELECT MAX({index_col}) FROM {table_name}').fetchone()[0]
return last_source_date