- Lab
- Data

Build and Deploy an ETL Pipeline with Python
This hands-on lab provides a step-by-step approach to building an ETL (Extract, Transform, Load) pipeline using Python. Participants will learn how to efficiently extract data from various sources, apply transformations for data cleaning and processing, and load the structured data into target systems for storage and analytics. By completing this lab, learners will gain practical experience in building an end-to-end ETL pipeline, preparing them to handle real-world data engineering tasks efficiently.

Path Info
Table of Contents
-
Challenge
Introduction to Build and Deploy an ETL Pipeline with Python
Introduction to Build and Deploy an ETL Pipeline with Python
In this lab, you'll explore various techniques to build Python ETL pipelines. By leveraging Python for ETL pipelines, you can create scripted logic that is both straightforward to comprehend and compose, while tapping into the extensive range of Python modules to meet your data processing requirements.
Learning Objectives
- Understand the 3 phases of ETL pipelines.
- Implement techniques to extract data from critical data ETL Python libraries such as
Pandas
,Requests
, andSQLAlchemy
. - Transform your data while its in memory and then load the data to a destination database.
To get started with this lab click the
>
button to proceed to the next step! -
Challenge
Extracting Data
Extract Phase
The extract phase is the first step in an ETL pipeline, where data is collected from various sources for further processing. During this phase, raw data is retrieved from flat files, databases, APIs, or other systems, often in different formats and structures. The goal is to gather all the necessary information efficiently and accurately, ensuring that downstream processes have the required input.
In this task you will learn how to:
- Read various raw data files into Pandas
- Read data into Pandas from a SQL database
- Read data into Pandas from REST APIs
To get started open the
extract.py
file.A SQLAlchemy engine has already been created with a connection to the MySQL database at the top of the script.
Task 1.0.1
Extracting Data from Flat Files
Pandas supports reading flat files such as the following:
.csv
.xlsx
.json
When Pandas reads the file, the contents are written to a dataframe in memory. Pandas dataframes are similar to SQL tables and are a great way to hold data while you clean and process it.
For this task:
- Read the
data/ example.csv
file intodf1
with thepd.read_csv
function. - Read the
data/ example.xlsx
file intodf2
with thepd.read_json
function. - Read the
data/ example.json
file intodf3
with thepd.read_json
function. ## Task 1.0.2
Data Loading with Dask
Dask is a powerful parallel processing framework. Dask allows you not only to process your workloads faster, but it also allows you to work with more data in memory than your machine has memory. Its great if you have really big Pandas dataframe and are doing lots of processing on the data in memory.
Let's do a simple exercise to see how you can load several files into a Dask dataframe. Note that
dask.dataframe
has been imported as the aliasdd
.- Observe the
dask_data_
files in thedata
folder. - Assign the
df4
variable the value from the following code snippet:
dd.read_csv("dask_data_*.csv")
Note the wildcard syntax in the file name (
*
). This is specifying that any file with the name prefixdask_data_
and an extension of.csv
will be read into the dataframe. 3. Optionally used theget_memory_size
andget_shape
functions to metadata about the dask dataframe.Task
1.03 Extract Data from Database Tables
Pandas can also read data directly from database tables using
pd.read_sql()
by using SQLAlchemy connectors. These SQLAlchemy connections are refered to as an engines.In this challenge you will learning how to connect to a database and use a raw SQL query to return results to a Pandas dataframe. This challenge uses a locally hosted mysql database, the connection credentials have already been provided at the top of the
extract.py
script.Read a database query into a Pandas dataframe:
- Create a SQLAlchemy engine and point it to the
data/sales.db
SQLite database - Assign the following SQL query to the
query
variablequery = "SELECT TABLE_NAME FROM TABLES"
- Assign the two required parameters to the
pd.read_sql
function, which arequery
andcon
.con
will be theengine
defined at the top of the script that connects to MySQL. - Use
pd.read_sql
function to read the data intodf5
Task 1.0.4
Extracting REST API Data with Requests
The Requests libray allows your to read data from a web endpoint and store that data as JSON. Request is helpful for building ETL pipelines because it can talk to different services via REST api endpoints. Lets use Requests to get data from a locally running REST api:
- Click the
+
icon next to theTerminal
and selectNew tab
. - On the
New tab
modal, selectTerminal
. - In the terminal execute the following command
python3 scripts/app.py
. This command starts the flask app which hosts multiple REST api endpoints. - Specify the following
url
variable for the REST api endpoint, which ishttp://localhost:5000/random-user
. - To make a
get
request, and print the response from the REST API, use the following code:r = requests.get(url) print(r.json()) ``` ## Task 1.0.5
### Authenticating with Requests
Most REST apis will require authentication, there are many forms of REST api authentication, in this challenge you will use a Bearer token. To do this you will need to assign an additional
headers
variable to the request which has the token.Add a bearer token to the requests
GET
request and authenticate to the/protected
endpoint.- In the
headers
dictionary variable, add the keyAuthorization
. - In the
headers
dictionary variable add the valueBearer mysecrettoken123
. - Assign the variable
response
to the results of the following request get:response = requests.get("http://localhost:5000/protected", headers=headers)
(Optional)
Loading data with AIOHTTP
In this task you will compare the execution time of running similar workloads with a
non-async.py
(Requests) andasync.py
(AIOhttp) libraries.- In the
scipts/
directory, open both theasync.py
andnon-async.py
python files. - Observe the difference in structure. Notice that the
async.py
file has the keywordasync
in front of themain()
function and that theaiohttp.ClientSession()
has theasync with
keyword. - Run both scripts with the following commands and observe the difference in execution time. Feel free to change the
num_iterations
to greater values to experiment.
To run the
async.py
file:python3 scripts async.py
To run the
non-async.py
file:python3 scripts non-async.py
-
Challenge
Transforming Data
Transform Phase
The Transform phase is where raw data is refined, structured, and enriched to meet business needs before being loaded into a target system.
This stage involves various data activities including:
- Data cleansing
- Deduplication
- Standardization
- Aggregation
- Applying business logic
- Joining
Transformations can range from simple conversions, such as changing date formats, to complex operations like joining multiple datasets, or calculating key performance indicators.
In this task you will learn how to:
- Apply different data conversion and translation to Pandas data.
- Query, filter, and aggregate Pandas data.
- Join multiple Pandas dataframes together with
pd.merge
.
To get started open the
transform.py
file. ## Task 2.0.1Convert Pandas Column Data Types
Remember that Pandas will infer data types by default. This may be fine for most cases but sometimes you will want to define data types explictly or convert them.
-
Observe the
data
dictionary. Notice that theZipCode
column is an integer. -
Convert the
ZipCode
column to a type ofstring
. ## Task 2.0.2
Convert Datetime Columns in Pandas
Next, we will convert the
Birthdate
column.-
Observe the current format of the
Birthdate
column. You can add aprint(df)
statement in the to see the values of the columns. -
The format of the column is in a
yyyy-mm-dd
format. We would like to convert the format tomm-dd-yyyy
format. -
Assign the converted value back to the
Birthdate
column. ## Task 2.0.3
Using
map
to Replace Dataframe ValuesNext we will learn about the
df.replace()
function. This function allows you to specify a dictionary of key value pairs and replace existing dataframe values.- Observe the
replace_map
dictionary, in this dictionary the value being replaced is specified first, followed by the replacement version. - Write a
df.replace()
statement that uses thedf
columnCity
and replaces its values with the ones specified inreplace_map
. - Remeber to assign the value of the
df.replace()
command back to thedf
variable. ## Task 2.0.4
Using Regex to Replace Dirty Data
You can also replace specific dataframe row values by specifying regex patterns. Regex is a string search pattern language that can identify specific strings in text. You can use regex to find and replace strings within your dataframes and replace or extract those values.
For example the following code snippet shows how to replace all
@@@@
values within a dataframe with####
.df['Column'] = df['Column'].str.replace(r'\@\@\@\@', '####', regex=True)
- Observe the
CharacterSeq
column in the dataframe. - Write a regex pattern that identifies the 4 occurences of the
****
character. - Use the
df = df['Column'].str.replace()
command with the regex pattern to replace the occurances of 4 asterix characters****
with underscores____
. ## (Optional)
Dataframe Filtering
There are multiple ways that you can filter dataframe rows a couple examples are with the
df.query()
function for example:df.query('Name == "Adam"')
Or with self referencing synax:
df[df.Name == "Adam"]
Filtered dataframe can also be assigned to other dataframes. This is useful for generating reports or sub datasets that only include specific values. For example:
test = df[df['Name'] == "Adam"] test ``` ## Task 2.0.5 ### Creating Custom Calculated Columns You can write your own functions to translate data in dataframes based on existing values. You can use a lambda function to pass an existing data value to the function similar to how you call a python function. 1. Observe the `cost_of_living_adjust` function, this function takes an `original` value and a `inc_percent` or incremental percentage. 2. Create a new column called `Salary_COL` and assign the values of the `cost_of_living_adjust` function based on the existing `Salary` column and a `5` percent increase. ## (Optional) ### Dataframe Aggregating You can aggregate the values of your dataframe easily with the `.agg` function. `.agg` accepts key words such as: - `count` - `min` - `max` - `mean` - `sum` To apply an aggregate to your dataframe you can specify the column name and the aggregate(s) you want to apply, for example: ```python df.agg({'Salary' : ['count','min', 'max', 'mean', 'sum']})
You can also use
.groupby
syntax in combination with.agg
:df.groupby("TShirtSize").agg({"Salary" : ["mean"]})
Task 2.0.6
Joining Data in Pandas
Pandas offers the ability to join multiple dataframe together, similar to how you would in a relational database with SQL.
Pandas uses the
.merge()
function to facilitate this process. When useing.merge()
you specify the original dataframe and the joining dataframe, theon
which is the column that is being matched andhow
which is the type of join.For example, the following example joins
df1
anddf2
together based on the matchingID
columns that are present in both dataframes. These examples also show the different types of joins available when using.merge()
:# Merge using different join types inner_merge = pd.merge(df1, df2, on='ID', how='inner') left_merge = pd.merge(df1, df2, on='ID', how='left') right_merge = pd.merge(df1, df2, on='ID', how='right') outer_merge = pd.merge(df1, df2, on='ID', how='outer')
For this task:
- Observe both dataframes (
df
anddf2
) - Use the
UserId
field as theon
parameter. - Use
left
as thehow
parameter. - Assign the results of the merged data from to a dataframe named
df_merged
.
-
Challenge
Loading Data
Loading Data
Data writing in Pandas refers to the process of exporting or saving data from a Pandas DataFrame to various file formats or databases. Pandas provides several functions to write data, including
to_csv()
,to_excel()
,to_sql()
, andto_json()
, allowing users to easily export data to formats like CSV, Excel, SQL databases, or JSON.These functions give you flexibility in saving data in a format that suits your needs, whether for sharing, archiving, or further processing. With options for customization such as delimiter specification, index handling, and database connection parameters, Pandas ensures a seamless experience in saving data to various destinations efficiently.
To get started open the
load.py
file. ## Task 3.0.1Loading Data with a SQLAlchemy Connection
-
Create a dictionary with the following python code:
data = { 'Transaction_ID': ['T1001', 'T1002', 'T1003', 'T1004'], 'Customer_Name': ['Alice Johnson', 'Bob Lee', 'Charlie Kim', 'David Clark'], 'Product': ['Laptop', 'Smartphone', 'Tablet', 'Smartwatch'], 'Quantity': [1, 2, 3, 1], 'Price_Per_Unit': [1200, 800, 400, 200], 'Total_Sale': [1200, 1600, 1200, 200], 'Sale_Date': ['2025-03-20', '2025-03-21', '2025-03-21', '2025-03-22'] }
-
Convert the dictionary into a Pandas dataframe variable
df
. -
Use the
df.to_csv
to write the dataframe contents to aTransactions.csv
file into thedata
directory. Include the following parameters when using theto_csv
function:index=False
## (Optional)
Validating Data Operations
You can use assert statements to validate that a specific data pipeline has finished successfully. Use assert statements to check that row counts, column types, totals all match the expected results.
For example, to assert that the rows in the file are the same as the number of rows in the table you could use the following code:
import pandas as pd from sqlalchemy import create_engine csv_df = pd.read_csv(csv_file) assert csv_df.shape[0] == 3, "Row counts do not match expected values!" print(f"Row counts match: {csv_df.shape[0]} rows")
-
What's a lab?
Hands-on Labs are real environments created by industry experts to help you learn. These environments help you gain knowledge and experience, practice without compromising your system, test without risk, destroy without fear, and let you learn from your mistakes. Hands-on Labs: practice your skills before delivering in the real world.
Provided environment for hands-on practice
We will provide the credentials and environment necessary for you to practice right within your browser.
Guided walkthrough
Follow along with the author’s guided walkthrough and build something new in your provided environment!
Did you know?
On average, you retain 75% more of your learning if you get time for practice.