Keeping up with the ups and downs of a pandemic is not an easy task as a data scientist

This text explains the code below

import pandas as pd
import pycountry


covid_data_url = 'https://covid.ourworldindata.org/data/owid-covid-data.csv'
stringency_api_url = 'https://covidtrackerapi.bsg.ox.ac.uk/api/stringency'

def covid():
    import pandas as pd
    
    df = pd.read_csv(covid_data_url)
    df['date'] = pd.to_datetime(df['date'])
    df = df.drop(columns=['continent','stringency_index','location','tests_units']).pivot(index='date',columns=['iso_code']).sort_values(by='date').select_dtypes(['number']).apply(pd.to_numeric, errors='coerce').drop_duplicates().dropna(axis=1,how='all').head(-1)
    #todo make all columns floats
    df.columns.names = ['variable', 'iso_code']
    start_date = df.index.min()
    now_date = df.index.max()

    import datetime as dt
    import requests as req
    now_date = str(now_date.strftime('%Y-%m-%d')).replace("'", "")
    start_date = str(start_date.strftime('%Y-%m-%d')).replace("'", "")
    now_date1 = str(dt.datetime.now().strftime('%Y-%m-%d')).replace("'", "")

    stringency_api =  f"{stringency_api_url}/date-range/{start_date}/{now_date}"
    stringency_df = pd.DataFrame.from_dict(
        req.get(stringency_api).json()['data'],
        orient='index')

    def extract_stringency(stringency_series, start_date='2020-01-02', end_date = dt.datetime.now().strftime('%Y-%m-%d')):
        try:
            df = pd.DataFrame().from_dict(
                {i['date_value']: [i['stringency']] for i in stringency_series.values if not pd.isna(i)}
                , orient='index').rename(columns={0: stringency_series.name})
            df = df.set_index(pd.DatetimeIndex(pd.to_datetime(df.index.values, format='%Y-%m-%d'),
                                                name='day')).groupby(
                level=0).mean()  # in case of double values per day, take their mean

            df = df.reindex(pd.date_range(start=start_date,
                                          end= end_date))

            df.interpolate(method='linear', inplace=True)
            return df
        except:
            pass

    def stringency_data_df(stringency_df):
        data_df = pd.DataFrame([])
        for iso_alpha in stringency_df.columns:
            data_df = pd.concat([
                data_df,
                extract_stringency(stringency_df[iso_alpha], end_date=df.index.max().strftime('%Y-%m-%d'))
            ], axis=1)

        return data_df

    stringency = pd.concat([stringency_data_df(stringency_df)],keys = ['stringency'],names=['iso_code'],axis=1)

    df = pd.concat([df,stringency],axis=1)

    return df

visualisation

from sklearn.preprocessing import QuantileTransformer

def covid_map_df(covid_data):
    """
    covid map contains:
    name of the country
    alpha_3

    """
    qt = QuantileTransformer(n_quantiles=150)

    pd.DataFrame(covid_data.loc[covid_data.index.max()]).columns#pivot(index = 'iso_code',columns='variable')
    covid_map = pd.DataFrame(covid_data.loc[covid_data.index.max()]).unstack(level=0)#.dropna()
    covid_map.columns = covid_map.columns.get_level_values(1).values
    for col in covid_map.filter(regex="_per").columns:
        new_col_name = str(col + '_relative_change')
        covid_map[new_col_name] = qt.fit_transform(covid_map[col].values.reshape(-1, 1))

    covid_map.index.name = 'alpha_3'
    covid_map.reset_index(inplace=True)
    # not every iso_code has a full name via pycountry
    covid_map['name'] = covid_map['alpha_3'].apply(lambda cd: pycountry.countries.get(alpha_3=cd).name if pycountry.countries.get(alpha_3=cd) else cd)
    return covid_map
cols_dd =  list(set(chloropleth_df.columns.tolist()) - {'iso_code','location','tests performed','tests_units'})
visible = np.array(cols_dd)
df = chloropleth_df
# define traces and buttons at once
traces = []
buttons = []
for value in cols_dd:

    traces.append(go.Choropleth(
       locations=df['iso_code'], # Spatial coordinates
        z=df[value].astype(float), # Data to be color-coded
        colorbar_title=value,
        visible= True if value==cols_dd[0] else False))

    buttons.append(dict(label=value,
                        method="update",
                        args=[{"visible":list(visible==value)},
                              {"title":f"<b>{value}</b>"}]))

updatemenus = [{"active":0,
                "buttons":buttons,
               }]


# Show figure
fig = go.Figure(data=traces,
                layout=dict(updatemenus=updatemenus))
# This is in order to get the first title displayed correctly
first_title = cols_dd[0]
fig.update_layout(title=f"<b>{first_title}</b>",title_x=0.5)
fig.show()

We want to store all this data in a central database, to simulate this for development we will run a local server. But in production switch the database url for prod url.

!pip install flask_sqlalchemy sqlalchemy-media psycopg2-binary --quiet
# install
!apt install postgresql postgresql-contrib &>log
!service postgresql start
!sudo -u postgres psql -c "CREATE USER root WITH SUPERUSER"
# set connection
%load_ext sql
%config SqlMagic.feedback=False 
%config SqlMagic.autopandas=True
%sql postgresql+psycopg2://@/postgres
     |████████████████████████████████| 1.3MB 5.7MB/s 
     |████████████████████████████████| 3.0MB 27.0MB/s 
  Building wheel for sqlalchemy-media (setup.py) ... done
 * Starting PostgreSQL 10 database server
   ...done.
CREATE ROLE
'Connected: @postgres'

In production switch this to the appropriate format for your database

from sqlalchemy import create_engine
local_engine = create_engine('postgresql+psycopg2://@/postgres')

Then we need to store the data, however since the dataframe has

def to_sql(df, name, con, if_exists='fail',chunksize=3276):
    if not df.index.name: df.index.name = 'idx'
    if not df.columns.name: df.columns.name = 'col'
    df.rename_axis(index=str.lower).unstack().rename_axis(index=str.lower).reset_index().set_index(
        df.index.name.lower()).sort_values(
        by=[df.index.name.lower()] + [col.lower() for col in list(df.columns.names)]).to_sql(name, con=con,
                                                                                             if_exists=if_exists,
                                                                                             method='multi',
                                                                                             chunksize=chunksize, index=True)
# read SQL
def read_sql(name: str, con: sa.engine.base.Engine, column_filter=None, row_filter=None) -> pd.DataFrame:
    """
    :name: name of the the in the db
    :con: sqlalchemy connection engine
    :column_filter: dict in format {0:['value_0','value_1'],1:['value']}, where 0 refers to the level of the multiindex column
    :row_filter: dict if format {'from':'2010-01-01','to':'2011-01-01'} or {'last':'10Q'}
    """
    if not con.has_table(name):  # check whether the table exists
        raise Exception(f"Table {name} does not exist")
    else:
        type_dict = sa.inspect(con).get_columns(name)
        index_col = [
                    col['name'] for col in type_dict if
                    any(datetype in col['type'].__str__() for datetype in ['TIMESTAMP','DATE'])
                    ][0]
        stack_cols = [col['name'] for col in type_dict if 'TEXT' in col['type'].__str__()]
        value_col = \
            [col['name'] for col in type_dict if ('DOUBLE' in col['type'].__str__() or 'INT' in col['type'].__str__())][
                0]
        params = tuple()

        query = f"SELECT * FROM {name}"  # base query

        if column_filter:  # select which column to fetch from db
            query += f" WHERE "
            query += " AND ".join([f"{stack_cols[int(key)]} IN ({','.join(['%s' for _ in value])})" for key, value in
                                   column_filter.items()])
            params = tuple([value for value in chain(*column_filter.values())])
        if row_filter:  # select between which dates to fetch
            if not column_filter:
                query += ' WHERE '
            else:
                query += ' AND '
            if 'from' in row_filter.keys() and not 'last' in row_filter.keys():
                query += f" {index_col} >= %s"
                params += tuple([dt.datetime.fromisoformat(row_filter['from'])])
            if 'to' in row_filter.keys() and not 'last' in row_filter.keys():
                if 'from' in row_filter.keys(): query += ' AND '
                query += f" {index_col} <= %s"
                params += tuple([dt.datetime.fromisoformat(row_filter['to'])])
            if 'last' in row_filter.keys() and not ('to' in row_filter.keys() or 'from' in row_filter.keys()):
                query += f" {index_col} >= %s"
                max_date = con.engine.execute(f'SELECT MAX({index_col}) FROM {name}').fetchone()[0]
                params += tuple([(max_date - pd.tseries.frequencies.to_offset(row_filter['last'])).to_pydatetime()])

        return pd.read_sql_query(query,
                                 con=con,
                                 index_col=index_col,
                                 params=params
                                 ).pivot(columns=stack_cols, values=value_col)
def get_last_source_date(table_name:str,con)->dt.datetime:
  import sqlalchemy as sa
  type_dict = sa.inspect(con).get_columns(table_name)
  index_col = [
              col['name'] for col in type_dict if
              any(datetype in col['type'].__str__() for datetype in ['TIMESTAMP','DATE'])
              ][0]
  last_source_date = con.engine.execute(f'SELECT MAX({index_col}) FROM {table_name}').fetchone()[0]
  return last_source_date