A Simple Data Engineering Workflow
Module 3 – Simple Data Engineering: Data Ingestion to Transformation¶
Author: Mariama Jaiteh, Helmholtz Centre for Infection Research, NFDI4Immuno
Data Sources: RKI collection of German Infection data¶
- The SARS-CoV-2 Infection datasets from Germany. Source: RKI repositories
Here is the general data schema of the infection dataset.
{
"fields": [
{
"name": "IdLandkreis",
"type": "integer",
"description": "Identifikationsnummer des Landkreises basierend auf dem Amtlichen Gemeindeschlüssel (AGS) zuzüglich der 12 Bezirke Berlins (11001 bis 11012); Gebietsstand: 30.06.2020 (2. Quartal)"
},
{
"name": "Altersgruppe",
"type": "string",
"description": "Altersspanne der in der Gruppe enthaltenen Fälle, stratifiziert nach 0-4 Jahren, 5-14 Jahren, 15-34 Jahren, 35-59 Jahren, 60-79 Jahren, 80+ Jahren sowie unbekannt",
"constraints": {
"enum": [
"A00-A04",
"A05-A14",
"A15-A34",
"A35-A59",
"A60-A79",
"A80+",
"unbekannt"
]
}
},
{
"name": "Geschlecht",
"type": "string",
"description": "Geschlecht der Fallgruppe: weiblich (W), männlich (M) und (unbekannt)",
"constraints": {
"enum": [
"W",
"M",
"unbekannt"
]
}
},
{
"name": "Meldedatum",
"type": "date",
"description": "Datum, wann der Fall dem Gesundheitsamt bekannt geworden ist. JJJJ entspricht der Jahreszahl, MM dem Monat und TT dem Tag.",
"format": "%Y-%m-%d"
},
{
"name": "Refdatum",
"type": "date",
"description": "Datum des Erkrankungsbeginns. Wenn das nicht bekannt ist, das Meldedatum.",
"format": "%Y-%m-%d"
},
{
"name": "IstErkrankungsbeginn",
"type": "boolean",
"description": "1: Refdatum ist der Erkrankungsbeginn 0: Refdatum ist das Meldedatum",
"constraints": {
"enum": [
"0",
"1"
]
}
},
{
"name": "NeuerFall",
"type": "integer",
"description": "`0` : Fälle der Gruppe sind in der Publikation für den aktuellen Tag und in der für den Vortag enthalten. Das bedeutet diese Fälle sind seit mehr als einem Tag bekannt. <br> `1` : Fälle der Gruppe sind erstmals in der aktuellen Publikation enthalten. Das heißt, es sind für den Publikationstag neu übermittelte oder entsprechend neu bewertete Fälle.<br> `-1`: Fälle der Gruppe sind in der Publikation des Vortags enthalten, werden jedoch nach dem aktuellen Tag aus den Fallzahlendaten entfernt. Das heißt, es sind Fälle die ab dem aktuellen Tag wegfallen. Eine solche Fallgruppe kann beispielsweise durch fälschliche Meldungen entstehen, die so als Korrektur angezeigt werden.",
"constraints": {
"enum": [
"-1",
"0",
"1"
]
}
},
{
"name": "NeuerTodesfall",
"type": "integer",
"description": "`0` : Fälle der Gruppe sind in der Publikation für den aktuellen Tag und in der für den Vortag enthalten. Das bedeutet diese Fälle sind seit mehr als einem Tag bekannt. <br> `1` : Fälle der Gruppe sind erstmals in der aktuellen Publikation enthalten. Das heißt, es sind für den Publikationstag neu übermittelte oder entsprechend neu bewertete Fälle.<br> `-1`: Fälle der Gruppe sind in der Publikation des Vortags enthalten, werden jedoch nach dem aktuellen Tag aus den Fallzahlendaten entfernt. Das heißt, es sind Fälle die ab dem aktuellen Tag wegfallen. Eine solche Fallgruppe kann beispielsweise durch fälschliche Meldungen entstehen, die so als Korrektur angezeigt werden.<br> `-9`: Fälle in der Gruppe sind weder in der Publikation für den aktuellen Tag, noch in der Publikation des Vortags, als genesen (\"NeuGenesen\") oder verstorben (\"NeuerTodesfall\") gemeldet. Das bedeutet, dass zu den Fällen in der Gruppe keine Information über den Gesundheitsverlauf der Infektion bekannt ist. Das ist zum Beispiel häufig der Fall, wenn eine Fallgruppe gerade erst als infiziert gemeldet worden ist.",
"constraints": {
"enum": [
"-9",
"-1",
"0",
"1"
]
}
},
{
"name": "NeuGenesen",
"type": "integer",
"description": "`0` : Fälle der Gruppe sind in der Publikation für den aktuellen Tag und in der für den Vortag enthalten. Das bedeutet diese Fälle sind seit mehr als einem Tag bekannt. <br> `1` : Fälle der Gruppe sind erstmals in der aktuellen Publikation enthalten. Das heißt, es sind für den Publikationstag neu übermittelte oder entsprechend neu bewertete Fälle.<br> `-1`: Fälle der Gruppe sind in der Publikation des Vortags enthalten, werden jedoch nach dem aktuellen Tag aus den Fallzahlendaten entfernt. Das heißt, es sind Fälle die ab dem aktuellen Tag wegfallen. Eine solche Fallgruppe kann beispielsweise durch fälschliche Meldungen entstehen, die so als Korrektur angezeigt werden.<br> `-9`: Fälle in der Gruppe sind weder in der Publikation für den aktuellen Tag, noch in der Publikation des Vortags, als genesen (\"NeuGenesen\") oder verstorben (\"NeuerTodesfall\") gemeldet. Das bedeutet, dass zu den Fällen in der Gruppe keine Information über den Gesundheitsverlauf der Infektion bekannt ist. Das ist zum Beispiel häufig der Fall, wenn eine Fallgruppe gerade erst als infiziert gemeldet worden ist.",
"constraints": {
"enum": [
"-9",
"-1",
"0",
"1"
]
}
},
{
"name": "AnzahlFall",
"type": "integer",
"description": "Anzahl der gemeldeten Fälle in der entsprechenden Fallgruppe <br> Für NeuerFall = -1, ist die Anzahl negativ: Es handelt sich um eine Korrektur der Fallgruppe, die angibt, wie viele Infektionen zu viel gemeldet worden sind"
},
{
"name": "AnzahlTodesfall",
"type": "integer",
"description": "Anzahl der gemeldeten Todesfälle in der entsprechenden Fallgruppe <br> Für NeuerTodesfall = -1, ist die Anzahl negativ: Es handelt sich um eine Korrektur der Fallgruppe, die angibt, wie viele Todesfälle zu viel gemeldet worden sind"
},
{
"name": "AnzahlGenesen",
"type": "integer",
"description": "Anzahl der genesenen Fälle in der entsprechenden Fallgruppe <br> Für NeuGenesen = -1, ist die Anzahl negativ: Es handelt sich um eine Korrektur der Fallgruppe, die angibt, wie viele genesene Fälle zu viel gemeldet worden sind"
}
]
}
🚀 Let's build a simple data pipeline¶
! pip install sqlalchemy pandas pyarrow
Let's already import all dependencies and define our datastore variables.
from datetime import datetime
from pathlib import Path
import pandas as pd
import sqlalchemy as sa
# “Fake lake” root using medallion folders
LAKE_ROOT = Path("fake-datastore")
BLOB_STORAGE = LAKE_ROOT / "BlobStorage"
BRONZE_DIR = LAKE_ROOT / "bronze" / "rki-infection"
SILVER_DIR = LAKE_ROOT / "silver" / "rki-infection"
GOLD_DIR = LAKE_ROOT / "gold" / "rki-infection"
# Few configs and paths for the data store
DB_PATH = LAKE_ROOT / "rki_project_database.db"
RAW_TABLE = "raw_infection"
SILVER_TABLE = "silver_infection"
GOLD_TABLE = "gold_infection"
Exercise 1.¶
pandas offers numerous functions to read and write data from / to sources.
📊 Part 1: Let's get the data¶
🖊️ Instructions If you're comfortable with Python and pandas. Write a function to download the dataset: deposited on Nov. 16th, 2025. Ensure that the function prints the shape of the dataframe downloaded as well as its name.
💡 Hint. The dataset can be directly downloaded from github using the python pandas package and the dataframe method read_csv(). This method can read data stored in remote repositories. Under the hood, it uses the requests python package to download via HTTPS.
⛔️ Before reading below cells, try to implement by yourself the functions.
# Let's get data deposited on Nov. 16th, 2025
GITHUB_ORG = "https://media.githubusercontent.com/media"
RKI_INFECTION_REPO = "robert-koch-institut/SARS-CoV-2-Infektionen_in_Deutschland"
DATA_SIGNATURE = "77b2ccc3398883786a82f2dcd479424c8a228772"
DATASET_URL = f"{GITHUB_ORG}/{RKI_INFECTION_REPO}/{DATA_SIGNATURE}/Aktuell_Deutschland_SarsCov2_Infektionen.csv"
def fetch_rki_dataset(url: str, output_path: str=None, download_locally: bool=False) -> pd.DataFrame:
"""
Function that download the SARS-CoV-2 Infection data from
RKI repository. The function returns a dataframe
but can also write out an output file.
Params
------
url: str
The URL to the dataset CSV file
output_path: str
Local output destionation for the dataset CSV file
download_locally: bool
Whether or not, the file should be downloaded
Returns
-------
df: pd.DataFrame
RKI dataset as a dataframe
"""
df = pd.read_csv(url)
if output_path != None and download_locally is True:
df.to_csv(path_or_buf=output_path)
print(f"The downloaded dataset: {url.split('/')[-1]} has nrows: {df.shape[0]} and ncols: {df.shape[1]}.")
return df
df = fetch_rki_dataset(
url=DATASET_URL,
output_path=f"rki_{DATA_SIGNATURE}_dataset.csv",
download_locally=True
)
The downloaded dataset: Aktuell_Deutschland_SarsCov2_Infektionen.csv has nrows: 7693423 and ncols: 12.
👀 Part 2: Let's have a quick look at the data¶
🖊️ Instructions. Get to know your dataset. Explore the following points and other relevant information you might want to gather on the dataset.
- How many variables (or columns) do we have?
- How many data points (or rows) do we have?
- How many missing values do we have?
- What is the data types of each variables? Do these make sense?
- What transformations should we make to the data before it's storage and analysis?
💡 Hint. All these questions can be answered using pandas. With your df dataframe, you can apply various methods to derive the basic statistics on your dataset. Just remember that for Data Engineering you don't need to run extensive analysis on the data. You just want to make sure the data you have is what you expect and have an understanding of its content.
🪨 Part 3: Let's store raw data first.¶
🖊️ Instructions. Before we do anything, let's save the raw data in a SQL database. This will make sure that we have an immutable copy of the data. We can always fallback to the raw data to recreate other datasets. Using sqlachemy, create a function that takes a dataframe and saves it into a SQL table. The DB can be created under the following path: DB_PATH.
💡 Hint. We will create and use a local sqlite database. Your function will connect to the database using sqlachemy.create_engine(f"sqlite:///{DB_PATH}")
Why do we use SQL database:
- allows the use of SQL querying on the dataset during retrieval.
- gives us some flexibility and we can extract only the data of interest.
⛔️ Before reading below cells, try to implement by yourself the functions.
def store_df_to_sqlite(df: pd.DataFrame, db_path: str, table_name: str) -> int:
"""
Function that Appends/replaces data into SQLite using SQLAlchemy.
Params
------
df: pd.DataFrame
Dataset to write in the database
db_path: str
Absolute path the the SQL database
table_name: str
Name of the destination table
Returns
-------
count: int
The number of rows written in the destination table
"""
engine = sa.create_engine(f"sqlite:///{db_path}")
with engine.begin() as conn:
df.to_sql(table_name, con=conn, if_exists="append", index=False)
results = conn.execute(sa.text(f"SELECT COUNT(*) FROM {table_name}"))
count = results.scalar()
return count
nrows_written = store_df_to_sqlite(df=df, db_path=DB_PATH, table_name=RAW_TABLE)
print(f"nrows = {nrows_written} were written under table: {RAW_TABLE}")
🥉 Part 4: Raw data to parquet file -- Bronze¶
🖊️ Instructions. Save the same dataframe into the Blob Storage as a parquet file, under the bronze directory (BRONZE_DIR). Moreover, the parquet file should be saved in a directory with today's date formatted as YYYY-MM-DD.
💡 Hint. You can save dataframes into parquet using pd.to_parquet(). This step is pretty straightforward and seems a little redundant as we already saved it in the database. But, often times, it's easier to load a parquet file compared to extracting data from database, especially if you're not familiar with SQL. Moreover, the data will be used by a wider team and actually not DE who are expected to know SQL.
⛔️ Before reading below cells, try to implement by yourself the functions.
def save_df_to_parquet(df: pd.DataFrame, dir_path: Path) -> Path:
"""
Function to save a dataframe as a parquet file in a timestamped directory
Params
------
df: pd.Dataframe
Dataset to save as a parquet
dir_path: Path
Directory where the dataset should be saved.
A subfolder with today's date will be created at this location.
Return
------
out_dir: Path
Exact folder where the dataset is saved
"""
load_date = datetime.utcnow().strftime("%Y-%m-%d")
out_dir = dir_path / f"{load_date}"
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / "data.parquet"
df.to_parquet(out_path, index=False)
return out_path
bronze_out = save_df_to_parquet(df=df, dir_path=BRONZE_DIR)
🥈 Part 5: Data Transformation -- Silver¶
🖊️ Instructions. Let's create the silver version of the infection dataset. Write for each of of these points, write a function that takes a dataframe and returns the transformed dataframe
- translate column from German to English and write column name in snake_case
- ensure that the datatypes are correct
- drop rows with
unbekanntorNA
Finally, write the silver dataset in the SQL database under the SILVER_TABLE name and then in the blob storage under the
💡 Hint To ensure that you're not modifying the original dataframe, use the following instruction before applying any transformation: new_df = original_df.copy(deep=True).
⛔️ Before reading below cells, try to implement by yourself the functions.
def translate_column_name(df: pd.DataFrame) -> pd.DataFrame:
"""
Function that translates column names from German to English
Params
------
df: pd.DataFrame
Returns
-------
df_new: pd.DataFrame
"""
de2en = {
"IdLandkreis": "county_id",
"Altersgruppe": "age_group",
"Geschlecht": "gender",
"Meldedatum": "reporting_date",
"Refdatum": "ref_date",
"IstErkrankungsbeginn": "disease_onset",
"NeuerFall": "new_case",
"NeuerTodesfall": "new_death",
"NeuGenesen": "new_recovered",
"AnzahlFall": "number_of_cases",
"AnzahlTodesfall": "number_of_deaths",
"AnzahlGenesen": "number_of_recovered",
}
df_new = df.copy(deep=True)
df_new.rename(columns=de2en, inplace=True)
return df_new
def fix_dtypes(df: pd.DataFrame) -> pd.DataFrame:
"""
Function that enforces data types on each column
Params
------
df: pd.DataFrame
Returns
-------
df_new: pd.DataFrame
"""
dtype_mapping = {
"county_id": "int32",
"age_group": "str",
"gender": "str",
"reporting_date": "datetime",
"ref_date": "datetime",
"disease_onset": "boolean",
"new_case": "int32",
"new_death": "int32",
"new_recovered": "int32",
"number_of_cases": "int32",
"number_of_deaths": "int32",
"number_of_recovered": "int32",
}
df_new = df.copy(deep=True)
for col, dtype in dtype_mapping.items():
if dtype == "datetime":
df_new[col] = pd.to_datetime(df_new[col], errors="coerce")
else:
df_new[col] = df_new[col].astype(dtype)
return df_new
def drop_columns_with_missing_data(df: pd.DataFrame, list_flaged_word: list=["unbekannt"]) -> pd.DataFrame:
"""
Function to clean dataset. Removes cols with missing information.
Params
------
df: pd.DataFrame
Input dataframe, e.g. bronze dataset
list_flaged_word: list
Words that are used to filter out some rows
Returns
-------
df_new: pd.DataFrame
Transformed dataframe
"""
df_new = df.dropna().copy(deep=True)
df_new = df_new[~df_new.isin(list_flaged_word).any(axis=1)]
print(f"After filtering, silver_df has: {df_new.shape[0] - df.shape[0]} row less.")
return df_new
def transform_bronze_to_silver(br_df: pd.DataFrame) -> pd.DataFrame
"""
Function to run all transformations to create silver dataset.
"""
silver_df = translate_column_name(df=br_df)
silver_df = fix_dtypes(df=silver_df)
silver_df = drop_columns_with_missing_data(df=silver_df)
return silver_df
def save_silver_to_parquet(df: pd.DataFrame, silver_dir: Path=SILVER_DIR) -> Path:
"""
Write silver data as Parquet to silver layer (partitioned by load_date).
"""
silver_out_path = save_df_to_parquet(df=df, dir_path=silver_dir)
return silver_out_path
🥇 Part 5: Data Aggregation -- Gold¶
We will skip this part in the interest of time. The aim of the gold dataset is to provide business value. This step is usually done with an extended team involving data scientists, stakeholders and data engineers.
As for the silver layer, this dataset once created (it could broken down into multiple datasets) wold be store in the database and the blob storage.