Pipeline Design and Orchestration with Prefect
Module 4 โ Pipeline Design and Orchestration with Prefectยถ
Author: Mariama Jaiteh, Helmholtz Centre for Infection Research, NFDI4Immuno
๐ Prefectยถ
Prefect is an data workflow orchestration engine. It allows you to write workflows using native and modern Python. With a workflow orchestration tool, you can create pipelines to automate and manage your tasks.
Prefect tasks are cache-able and retryable units of work that perform one action in your pipeline. Tasks are like normal functions with more capabilities such as metadata, states, and caching. Tasks can be part of other tasks and flows. Tasks have the @task decorator.
Prefect flows are the components that perfom the work as a workflow, i.e. they orchestrate how tasks run. Flows comprise a set of tasks or flows. As for tasks, they are similar to standard python functions but augmented with capabilities such as metadata, states, and caching. Flows have the @flow decorator.
๐ Let's build a data pipeline with Prefectยถ
โ๏ธ Let's install the dependenciesยถ
! pip install prefect
โ๏ธ Quick How-toยถ
๐งฉ In the previous notebook, Module_2_Simple_Data_Engineering, we have created several functions to ingest and transform the data. We now want to put all that under a prefect pipeline
So first, let's take a look at the function which downloads data from the RKI repository and let's convert it into a task.
It's very simple. We will use @task decorator just above the function definition. The fetch_rki_dataset() is now a prefect task.
@task
def fetch_rki_dataset(url: str, output_path: str=None, download_locally: bool=False) -> pd.DataFrame:
"""
Function that download the SARS-CoV-2 Infection data from
RKI repository. The function returns a dataframe
but can also write out an output file.
Params
------
url: str
The URL to the dataset CSV file
output_path: str
Local output destionation for the dataset CSV file
download_locally: bool
Whether or not, the file should be downloaded
Returns
-------
df: pd.DataFrame
RKI dataset as a dataframe
"""
df = pd.read_csv(url)
if output_path != None and download_locally is True:
df.to_csv(path_or_buf=output_path)
print(f"The downloaded dataset: {url.split("/")[-1]} has nrows: {df.shape[0]} and ncols: {df.shape[1]}.")
return df
Now, let's create a small pipeline to ingest the raw data into our SQL database.
In addition to the download task, we will need to create a task to save the downloaded data into the database.
Similarily, we will add the decorator @task above the function store_df_to_sqlite() to convert it into a task.
@task
def store_df_to_sqlite(df: pd.DataFrame, db_path: str, table_name: str) -> int:
"""
Function that Appends/replaces data into SQLite using SQLAlchemy.
Params
------
df: pd.DataFrame
Dataset to write in the database
db_path: str
Absolute path the the SQL database
table_name: str
Name of the destination table
Returns
-------
count: int
The number of rows written in the destination table
"""
engine = create_engine(f"sqlite:///{db_path}")
with engine.begin() as conn:
df.to_sql(table_name, con=conn, if_exists="append", index=False)
results = conn.execute(text(f"SELECT COUNT(*) FROM {table_name}"))
count = results.scalar()
return count
Finally, the flow will be as follow:
- First, we download the dataset from the Github repository as a dataframe, then print some basic info about what has been downloaded
- Second, we write the dataframe into the SQL database under the
RAW_TABLEtable. Here is how it looks like in a Prefect flow:
@flow(name="raw_data_ingestion")
def ingest_raw_data():
raw_df = fetch_rki_dataset(url=DATASET_URL)
print(f"The Dataset {DATASET_URL.split('/')[-1]} is downloaded. It has ncols={raw_df.shape[1]} and nrows={raw_df.shape[0]}")
nrows_written = store_df_to_sqlite(df=raw_df, db_path=DB_PATH, table_name=RAW_TABLE)
print(f"nrows = {nrows_written} were written under table: {RAW_TABLE}")
๐ก Prefect Dashboard: Let's have a look at the dashboardยถ
Open a terminal and run the following command
prefect server start
Then open the dashboard at the proposed link, e.g. [http://127.0.0.1:4200]
There you will see the tasks and flows you have defined. The UI shows you the state of the tasks and allows you to create automation among other things.
๐ฅ Part 1: Let's create a simple workflow for the ingestionยถ
๐๏ธ Instructions Reusing the download function and any function you find relevant, create tasks to
- download the infection dataset
- save the dataset in the SQL database under
RAW_TABLE - save the dataset in the blob storage under
BRONZE_DIR
Once the tasks have been implemented, create a flow that runs sequentially each step.
๐ก Hint. Prefect tasks behave as normal python function and have the decorator @task. Tasks are called sequentially inside flows the same way as you will call functions inside a wrapper function. Flows have the decorator @flow.
โ๏ธ Before reading below cells, try to implement by yourself the functions.
from prefect import flow, task
from datetime import datetime
from pathlib import Path
import pandas as pd
from sqlalchemy import create_engine, text
# โFake lakeโ root using medallion folders
LAKE_ROOT = Path("fake-datastore")
BLOB_STORAGE = LAKE_ROOT / "BlobStorage"
BRONZE_DIR = BLOB_STORAGE / "bronze" / "rki-infection"
SILVER_DIR = BLOB_STORAGE / "silver" / "rki-infection"
GOLD_DIR = BLOB_STORAGE / "gold" / "rki-infection"
# Few configs and paths for the data store
DB_PATH = LAKE_ROOT / "rki_project_database.db"
RAW_TABLE = "raw_infection"
SILVER_TABLE = "silver_infection"
GOLD_TABLE = "gold_infection"
# Let's get data deposited on Nov. 16th, 2025
GITHUB_ORG = "https://media.githubusercontent.com/media"
RKI_INFECTION_REPO = "robert-koch-institut/SARS-CoV-2-Infektionen_in_Deutschland"
DATA_SIGNATURE = "refs/heads/main"
DATASET_URL = f"{GITHUB_ORG}/{RKI_INFECTION_REPO}/{DATA_SIGNATURE}/Aktuell_Deutschland_SarsCov2_Infektionen.csv"
@task(retries=2, retry_delay_seconds=10, log_prints=True)
def fetch_rki_dataset(url: str, output_path: str=None, download_locally: bool=False) -> pd.DataFrame:
"""
Function that download the SARS-CoV-2 Infection data from
RKI repository. The function returns a dataframe
but can also write out an output file.
Params
------
url: str
The URL to the dataset CSV file
output_path: str
Local output destionation for the dataset CSV file
download_locally: bool
Whether or not, the file should be downloaded
Returns
-------
df: pd.DataFrame
RKI dataset as a dataframe
"""
df = pd.read_csv(url)
if output_path != None and download_locally is True:
df.to_csv(path_or_buf=output_path)
print(f"The downloaded dataset: {url.split("/")[-1]} has nrows: {df.shape[0]} and ncols: {df.shape[1]}.")
return df
@task(log_prints=True)
def ensure_dirs(*dirs: Path) -> None:
for d in dirs:
d.mkdir(parents=True, exist_ok=True)
@task(retries=2, retry_delay_seconds=10, log_prints=True)
def store_df_to_sqlite(df: pd.DataFrame, db_path: str, table_name: str) -> int:
"""
Function that Appends/replaces data into SQLite using SQLAlchemy.
Params
------
df: pd.DataFrame
Dataset to write in the database
db_path: str
Absolute path the the SQL database
table_name: str
Name of the destination table
Returns
-------
count: int
The number of rows written in the destination table
"""
engine = create_engine(f"sqlite:///{db_path}")
with engine.begin() as conn:
df.to_sql(table_name, con=conn, if_exists="append", index=False)
results = conn.execute(text(f"SELECT COUNT(*) FROM {table_name}"))
count = results.scalar()
return count
@task(retries=2, retry_delay_seconds=10, log_prints=True)
def save_df_to_parquet(df: pd.DataFrame, dir_path: Path) -> Path:
"""
Function to save a dataframe as a parquet file in a timestamped directory
Params
------
df: pd.Dataframe
Dataset to save as a parquet
dir_path: Path
Directory where the dataset should be saved.
A subfolder with today's date will be created at this location.
Return
------
out_dir: Path
Exact folder where the dataset is saved
"""
load_date = datetime.utcnow().strftime("%Y-%m-%d")
out_dir = dir_path / f"{load_date}"
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / "data.parquet"
df.to_parquet(out_path, index=False)
return out_path
@flow(name="raw_data_ingestion") #, timeout_seconds=20, log_prints=True, retries=2, retry_delay_seconds=15)
def ingest_raw_data():
raw_df = fetch_rki_dataset(url=DATASET_URL)
print(f"The Dataset {DATASET_URL.split('/')[-1]} is downloaded. It has ncols={raw_df.shape[1]} and nrows={raw_df.shape[0]}")
nrows_written = store_df_to_sqlite(df=raw_df, db_path=DB_PATH, table_name=RAW_TABLE)
print(f"nrows = {nrows_written} were written under table: {RAW_TABLE}")
ensure_dirs(BRONZE_DIR)
out_raw_path = save_df_to_parquet(df=raw_df, dir_path=BRONZE_DIR)
print(f"The Raw data has been saved under the Bronze layer at the following path:\n{out_raw_path}")
return out_raw_path
flow_ingest_raw_out = ingest_raw_data()
๐ฅ Part 2: Let's create a simple workflow for the transformationยถ
๐๏ธ Instructions Reusing the transformation functions, create tasks to:
- to transform the downloaded infection dataset
- save the transformed dataset in the SQL database under
SILVER_TABLE - save the transformed dataset in the blob storage under
SILVER_DIR
Once the tasks have been implemented, create a flow that runs sequentially each step.
โ๏ธ Before reading below cells, try to implement by yourself the functions.
def translate_column_name(df: pd.DataFrame) -> pd.DataFrame:
"""
Function that translates column names from German to English
Params
------
df: pd.DataFrame
Returns
-------
df_new: pd.DataFrame
"""
de2en = {
"IdLandkreis": "county_id",
"Altersgruppe": "age_group",
"Geschlecht": "gender",
"Meldedatum": "reporting_date",
"Refdatum": "ref_date",
"IstErkrankungsbeginn": "disease_onset",
"NeuerFall": "new_case",
"NeuerTodesfall": "new_death",
"NeuGenesen": "new_recovered",
"AnzahlFall": "number_of_cases",
"AnzahlTodesfall": "number_of_deaths",
"AnzahlGenesen": "number_of_recovered",
}
df_new = df.copy(deep=True)
df_new.rename(columns=de2en, inplace=True)
return df_new
def fix_dtypes(df: pd.DataFrame) -> pd.DataFrame:
"""
Function that enforces data types on each column
Params
------
df: pd.DataFrame
Returns
-------
df_new: pd.DataFrame
"""
dtype_mapping = {
"county_id": "int32",
"age_group": "str",
"gender": "str",
"reporting_date": "datetime",
"ref_date": "datetime",
"disease_onset": "boolean",
"new_case": "int32",
"new_death": "int32",
"new_recovered": "int32",
"number_of_cases": "int32",
"number_of_deaths": "int32",
"number_of_recovered": "int32",
}
df_new = df.copy(deep=True)
for col, dtype in dtype_mapping.items():
if dtype == "datetime":
df_new[col] = pd.to_datetime(df_new[col], errors="coerce")
else:
df_new[col] = df_new[col].astype(dtype)
return df_new
def drop_columns_with_missing_data(df: pd.DataFrame, list_flaged_word: list=["unbekannt"]) -> pd.DataFrame:
"""
Function to clean dataset. Removes cols with missing information.
Params
------
df: pd.DataFrame
Input dataframe, e.g. bronze dataset
list_flaged_word: list
Words that are used to filter out some rows
Returns
-------
df_new: pd.DataFrame
Transformed dataframe
"""
df_new = df.dropna().copy(deep=True)
df_new = df_new[~df_new.isin(list_flaged_word).any(axis=1)]
print(f"After filtering, silver_df has: {df_new.shape[0] - df.shape[0]} row less.")
return df_new
@task
def read_data_from_blob(dir_path: Path) -> pd.DataFrame:
"""
Function to read a dataframe from a parquet file.
Params
------
dir_path: Path
Directory where the dataset should be read from. This path does not have the
timestamped subfolder with today's date. Here we make the assumption that
we always want to read today's data.
Return
------
read_df: pd.Dataframe
The read dataset
"""
load_date = datetime.utcnow().strftime("%Y-%m-%d")
in_dir = dir_path / f"{load_date}" / "data.parquet"
read_df = pd.read_parquet(path=in_dir)
print(f"Loaded the dataset located: {in_dir}")
return read_df
@task
def transform_bronze_to_silver(br_df: pd.DataFrame) -> pd.DataFrame:
"""
Function to run all transformations to create silver dataset.
"""
silver_df = translate_column_name(df=br_df)
silver_df = fix_dtypes(df=silver_df)
silver_df = drop_columns_with_missing_data(df=silver_df)
print(f"โ
Bronze dataset -> Silver dataset !")
return silver_df
@flow(name="raw-data-transformation")
def bronze_to_silver():
bronze_df = read_data_from_blob(dir_path=BRONZE_DIR)
print(f"Bronze dataset has nrows: {bronze_df.shape[0]} and ncols: {bronze_df.shape[1]}")
silver_df = transform_bronze_to_silver(br_df=bronze_df)
print(f"Silver dataset has nrows: {silver_df.shape[0]} and ncols: {silver_df.shape[1]}")
silver_out_path = save_df_to_parquet(df=silver_df, dir_path=silver_out_path)
print(f"The Transformed data has been saved under the SILVER layer at the following path:\n{SILVER_DIR}")
nrows_written = store_df_to_sqlite(df=silver_df, db_path=DB_PATH, table_name=SILVER_TABLE)
print(f"nrows = {nrows_written} were written under table: {SILVER_TABLE}")
return silver_out_path
flow_bronze_to_silver_out = bronze_to_silver()
๐ Part 3: Let's create a combine workflow with both ingestion and transformationยถ
๐๏ธ Instructions Reusing previously defined flows or tasks, create a unique flow to run the full ELT pipeline from Raw to Silver.
โ๏ธ Before reading below cells, try to implement by yourself the functions.
@flow(name="raw_to_silver")
def raw_to_silver():
bronze_path = ingest_raw_data()
silver_path = bronze_to_silver()
raw_to_silver()
๐ Part 4: From Notebooks to Automation โ Deploying Your Pipelineยถ
So far, weโve built and tested our data pipeline interactively.
In real-world scenarios, a data engineer doesnโt want to run these flows manually every day.
In production-grade settings, the pipelines must run timely and consitently. That's where deployment and automation come in.
โ๏ธ Why automate?ยถ
Automation removes manual triggers and ensures your pipelines run:
- โ On a schedule (e.g. daily at 6 AM)
- โ In a controlled environment
- โ With consistent dependencies and configurations
Prefect allows you to package, deploy, and schedule your pipelines.
And once deployed, Prefect will take care of the orchestration, i.e. triggering, logging, retries, etc.
๐ณ How Prefect Deployments Workยถ
When you create a deployment for a flow:
- Prefect builds a Docker image containing:
- Your flow code
- Dependencies (defined in
requirements.txtorpyproject.toml)
- The image is pushed into your container registry (e.g. DockerHub, Quay, GitLab Registry).
- During scheduled or triggered runs, the image is pulled from the registry and executed in a container by your Prefect worker pool.
This ensures:
- ๐ฏ Reproducibility โ every run uses the same code and environment
- โก Speed โ images are reused for future runs, i.e. since there are pulled from container registry
- ๐ Scalability โ flows can run on any worker or infrastructure
๐งฑ Example: Deploying our RKI Infection - raw to silver flow with Prefectยถ
Here is a simple example of how a flow can be turned into a deployment:
from prefect.client.schemas.schedules import CronSchedule
raw_to_silver.deploy(
name="rki-raw-to-silver",
parameters={},
schedule=CronSchedule(cron="0 6 * * *", timezone="Europe/Stockholm"),
work_pool_name="default-agent-pool",
tags=["rki", "automation"]
)
Unfortunately, we will not test this code as we need more dependencies and more time. But, I hope this workshop made you curious about how to automate your data related workflow.
๐ช So, keep going!
Module 5: Data Quality Monitoring & Pipeline Managementยถ
After pipelines are automated and deployed in production, the next important phases are: ensuring data quality and managing pipelines at scale.
๐ Data Quality Monitoringยถ
Once a pipeline is running, we need to ensured that produced datasets are accurate, consistent, and reliable.
Data quality monitoring focuses on detecting anomalies early before the data is used and consume by the analytics and modellers.
Typical data quality checks include:
- Schema validation: verifying columns, types, and allowed ranges
- Completeness: ensuring no unexpected null or missing values
- Uniqueness: detecting duplicates in identifiers
- Integrity: confirming relationships between datasets (e.g., foreign keys)
- Freshness: verifying data is updated on schedule
An example of framework is Great Expectations which can be used to define and validate โexpectationsโ for datasets
Data quality monitoring tools can be used as part of your flow to catch problems right after ingestion and transformation.
๐งญ Pipeline Management & Observabilityยถ
As pipelines grow more complex, managing them becomes as important as building them.
Pipeline management involves tracking, observing, and optimizing every run.
Modern orchestration tools like Prefect, Airflow, and Dagster provide:
- Centralized dashboards for runs, logs, and task performance
- Retry and failure alerts (e.g. via email or Slack)
- Versioning and reproducibility for data and code
- Dynamic scaling โ workers automatically handle more flows as data volume grows
Ensuring high-quality and well-managed pipelines are the foundation of a reliable production-grade data pipeline.