• Labs icon Lab
  • Data
Labs

Build and Deploy an ETL Pipeline with Python

This hands-on lab provides a step-by-step approach to building an ETL (Extract, Transform, Load) pipeline using Python. Participants will learn how to efficiently extract data from various sources, apply transformations for data cleaning and processing, and load the structured data into target systems for storage and analytics. By completing this lab, learners will gain practical experience in building an end-to-end ETL pipeline, preparing them to handle real-world data engineering tasks efficiently.

Labs

Path Info

Level
Clock icon Intermediate
Duration
Clock icon 1h 3m
Published
Clock icon Mar 28, 2025

Contact sales

By filling out this form and clicking submit, you acknowledge our privacy policy.

Table of Contents

  1. Challenge

    Introduction to Build and Deploy an ETL Pipeline with Python

    Introduction to Build and Deploy an ETL Pipeline with Python

    In this lab, you'll explore various techniques to build Python ETL pipelines. By leveraging Python for ETL pipelines, you can create scripted logic that is both straightforward to comprehend and compose, while tapping into the extensive range of Python modules to meet your data processing requirements.

    Learning Objectives

    • Understand the 3 phases of ETL pipelines.
    • Implement techniques to extract data from critical data ETL Python libraries such as Pandas,Requests, and SQLAlchemy.
    • Transform your data while its in memory and then load the data to a destination database.

    To get started with this lab click the > button to proceed to the next step!

  2. Challenge

    Extracting Data

    Extract Phase

    The extract phase is the first step in an ETL pipeline, where data is collected from various sources for further processing. During this phase, raw data is retrieved from flat files, databases, APIs, or other systems, often in different formats and structures. The goal is to gather all the necessary information efficiently and accurately, ensuring that downstream processes have the required input.

    In this task you will learn how to:

    • Read various raw data files into Pandas
    • Read data into Pandas from a SQL database
    • Read data into Pandas from REST APIs

    To get started open the extract.py file.

    A SQLAlchemy engine has already been created with a connection to the MySQL database at the top of the script.

    Task 1.0.1

    Extracting Data from Flat Files

    Pandas supports reading flat files such as the following:

    • .csv
    • .xlsx
    • .json

    When Pandas reads the file, the contents are written to a dataframe in memory. Pandas dataframes are similar to SQL tables and are a great way to hold data while you clean and process it.

    For this task:

    1. Read the data/ example.csv file into df1 with the pd.read_csv function.
    2. Read the data/ example.xlsx file into df2 with the pd.read_json function.
    3. Read the data/ example.json file into df3 with the pd.read_json function. ## Task 1.0.2

    Data Loading with Dask

    Dask is a powerful parallel processing framework. Dask allows you not only to process your workloads faster, but it also allows you to work with more data in memory than your machine has memory. Its great if you have really big Pandas dataframe and are doing lots of processing on the data in memory.

    Let's do a simple exercise to see how you can load several files into a Dask dataframe. Note that dask.dataframe has been imported as the alias dd.

    1. Observe the dask_data_ files in the data folder.
    2. Assign the df4 variable the value from the following code snippet:
    dd.read_csv("dask_data_*.csv")
    

    Note the wildcard syntax in the file name (*). This is specifying that any file with the name prefix dask_data_ and an extension of .csv will be read into the dataframe. 3. Optionally used the get_memory_size and get_shape functions to metadata about the dask dataframe.

    Task

    1.03 Extract Data from Database Tables

    Pandas can also read data directly from database tables using pd.read_sql() by using SQLAlchemy connectors. These SQLAlchemy connections are refered to as an engines.

    In this challenge you will learning how to connect to a database and use a raw SQL query to return results to a Pandas dataframe. This challenge uses a locally hosted mysql database, the connection credentials have already been provided at the top of the extract.py script.

    Read a database query into a Pandas dataframe:

    1. Create a SQLAlchemy engine and point it to the data/sales.db SQLite database
    2. Assign the following SQL query to the query variable query = "SELECT TABLE_NAME FROM TABLES"
    3. Assign the two required parameters to the pd.read_sql function, which are query and con. con will be the engine defined at the top of the script that connects to MySQL.
    4. Use pd.read_sql function to read the data into df5

    Task 1.0.4

    Extracting REST API Data with Requests

    The Requests libray allows your to read data from a web endpoint and store that data as JSON. Request is helpful for building ETL pipelines because it can talk to different services via REST api endpoints. Lets use Requests to get data from a locally running REST api:

    1. Click the + icon next to the Terminal and select New tab.
    2. On the New tab modal, select Terminal.
    3. In the terminal execute the following command python3 scripts/app.py. This command starts the flask app which hosts multiple REST api endpoints.
    4. Specify the following url variable for the REST api endpoint, which is http://localhost:5000/random-user.
    5. To make a get request, and print the response from the REST API, use the following code:
      	r = requests.get(url)
      	print(r.json())
      	``` ## Task 1.0.5 
      

    ### Authenticating with Requests

    Most REST apis will require authentication, there are many forms of REST api authentication, in this challenge you will use a Bearer token. To do this you will need to assign an additional headers variable to the request which has the token.

    Add a bearer token to the requests GET request and authenticate to the /protected endpoint.

    1. In the headers dictionary variable, add the key Authorization.
    2. In the headers dictionary variable add the value Bearer mysecrettoken123.
    3. Assign the variable response to the results of the following request get:
      
      	response = requests.get("http://localhost:5000/protected", headers=headers)
      
      

    (Optional)

    Loading data with AIOHTTP

    In this task you will compare the execution time of running similar workloads with a non-async.py (Requests) and async.py (AIOhttp) libraries.

    1. In the scipts/ directory, open both the async.py and non-async.py python files.
    2. Observe the difference in structure. Notice that the async.py file has the keyword async in front of the main() function and that the aiohttp.ClientSession() has the async with keyword.
    3. Run both scripts with the following commands and observe the difference in execution time. Feel free to change the num_iterations to greater values to experiment.

    To run the async.py file:

    python3 scripts async.py
    

    To run the non-async.py file:

    python3 scripts non-async.py
    
  3. Challenge

    Transforming Data

    Transform Phase

    The Transform phase is where raw data is refined, structured, and enriched to meet business needs before being loaded into a target system.

    This stage involves various data activities including:

    • Data cleansing
    • Deduplication
    • Standardization
    • Aggregation
    • Applying business logic
    • Joining

    Transformations can range from simple conversions, such as changing date formats, to complex operations like joining multiple datasets, or calculating key performance indicators.

    In this task you will learn how to:

    • Apply different data conversion and translation to Pandas data.
    • Query, filter, and aggregate Pandas data.
    • Join multiple Pandas dataframes together with pd.merge.

    To get started open the transform.py file. ## Task 2.0.1

    Convert Pandas Column Data Types

    Remember that Pandas will infer data types by default. This may be fine for most cases but sometimes you will want to define data types explictly or convert them.

    1. Observe the data dictionary. Notice that the ZipCode column is an integer.

    2. Convert the ZipCode column to a type of string. ## Task 2.0.2

    Convert Datetime Columns in Pandas

    Next, we will convert the Birthdate column.

    1. Observe the current format of the Birthdate column. You can add a print(df) statement in the to see the values of the columns.

    2. The format of the column is in a yyyy-mm-dd format. We would like to convert the format to mm-dd-yyyy format.

    3. Assign the converted value back to the Birthdate column. ## Task 2.0.3

    Using map to Replace Dataframe Values

    Next we will learn about the df.replace() function. This function allows you to specify a dictionary of key value pairs and replace existing dataframe values.

    1. Observe the replace_map dictionary, in this dictionary the value being replaced is specified first, followed by the replacement version.
    2. Write a df.replace() statement that uses the df column City and replaces its values with the ones specified in replace_map.
    3. Remeber to assign the value of the df.replace() command back to the df variable. ## Task 2.0.4

    Using Regex to Replace Dirty Data

    You can also replace specific dataframe row values by specifying regex patterns. Regex is a string search pattern language that can identify specific strings in text. You can use regex to find and replace strings within your dataframes and replace or extract those values.

    For example the following code snippet shows how to replace all @@@@ values within a dataframe with ####.

    
    df['Column'] = df['Column'].str.replace(r'\@\@\@\@', '####', regex=True)
    
    1. Observe the CharacterSeq column in the dataframe.
    2. Write a regex pattern that identifies the 4 occurences of the **** character.
    3. Use the df = df['Column'].str.replace() command with the regex pattern to replace the occurances of 4 asterix characters **** with underscores ____. ## (Optional)

    Dataframe Filtering

    There are multiple ways that you can filter dataframe rows a couple examples are with the df.query() function for example:

    df.query('Name == "Adam"')
    

    Or with self referencing synax:

    df[df.Name == "Adam"]
    

    Filtered dataframe can also be assigned to other dataframes. This is useful for generating reports or sub datasets that only include specific values. For example:

    test = df[df['Name'] == "Adam"]
    test
    ``` ## Task 2.0.5 
    ### Creating Custom Calculated Columns
    
    You can write your own functions to translate data in dataframes based on existing values. You can use a lambda function to pass an existing data value to the function similar to how you call a python function. 
    
    1. Observe the `cost_of_living_adjust` function, this function takes an `original` value and a `inc_percent` or incremental percentage. 
    2. Create a new column called `Salary_COL` and assign the values of the `cost_of_living_adjust` function based on the existing `Salary` column and a `5` percent increase. ## (Optional) 
    ### Dataframe Aggregating
    
    You can aggregate the values of your dataframe easily with the `.agg` function. `.agg` accepts key words such as:
    
    - `count`
    - `min`
    - `max`
    - `mean`
    - `sum`
    
    To apply an aggregate to your dataframe you can specify the column name and the aggregate(s) you want to apply, for example:
    
    ```python
    df.agg({'Salary' : ['count','min', 'max', 'mean', 'sum']})
    

    You can also use .groupby syntax in combination with .agg:

    df.groupby("TShirtSize").agg({"Salary" : ["mean"]})
    

    Task 2.0.6

    Joining Data in Pandas

    Pandas offers the ability to join multiple dataframe together, similar to how you would in a relational database with SQL.

    Pandas uses the .merge() function to facilitate this process. When useing .merge() you specify the original dataframe and the joining dataframe, the on which is the column that is being matched and how which is the type of join.

    For example, the following example joins df1 and df2 together based on the matching ID columns that are present in both dataframes. These examples also show the different types of joins available when using .merge():

    
    # Merge using different join types
    inner_merge = pd.merge(df1, df2, on='ID', how='inner')  
    left_merge = pd.merge(df1, df2, on='ID', how='left')    
    right_merge = pd.merge(df1, df2, on='ID', how='right')  
    outer_merge = pd.merge(df1, df2, on='ID', how='outer')  
    
    

    For this task:

    1. Observe both dataframes (df and df2)
    2. Use the UserId field as the on parameter.
    3. Use left as the how parameter.
    4. Assign the results of the merged data from to a dataframe named df_merged.
  4. Challenge

    Loading Data

    Loading Data

    Data writing in Pandas refers to the process of exporting or saving data from a Pandas DataFrame to various file formats or databases. Pandas provides several functions to write data, including to_csv(), to_excel(), to_sql(), and to_json(), allowing users to easily export data to formats like CSV, Excel, SQL databases, or JSON.

    These functions give you flexibility in saving data in a format that suits your needs, whether for sharing, archiving, or further processing. With options for customization such as delimiter specification, index handling, and database connection parameters, Pandas ensures a seamless experience in saving data to various destinations efficiently.

    To get started open the load.py file. ## Task 3.0.1

    Loading Data with a SQLAlchemy Connection

    1. Create a dictionary with the following python code:

      	data = {
      	'Transaction_ID': ['T1001', 'T1002', 'T1003', 'T1004'],
      	'Customer_Name': ['Alice Johnson', 'Bob Lee', 'Charlie Kim', 'David Clark'],
      	'Product': ['Laptop', 'Smartphone', 'Tablet', 'Smartwatch'],
      	'Quantity': [1, 2, 3, 1],
      	'Price_Per_Unit': [1200, 800, 400, 200],
      	'Total_Sale': [1200, 1600, 1200, 200],
      	'Sale_Date': ['2025-03-20', '2025-03-21', '2025-03-21', '2025-03-22']
      	}
      
      
    2. Convert the dictionary into a Pandas dataframe variable df.

    3. Use the df.to_csv to write the dataframe contents to a Transactions.csv file into the data directory. Include the following parameters when using the to_csv function:

      • index=False ## (Optional)

    Validating Data Operations

    You can use assert statements to validate that a specific data pipeline has finished successfully. Use assert statements to check that row counts, column types, totals all match the expected results.

    For example, to assert that the rows in the file are the same as the number of rows in the table you could use the following code:

    
    import pandas as pd
    from sqlalchemy import create_engine
    
    csv_df = pd.read_csv(csv_file)
    assert csv_df.shape[0] == 3, "Row counts do not match expected values!"
    print(f"Row counts match: {csv_df.shape[0]} rows")
    
    

Ian is a database enthusiast with particular expertise in the Microsoft data stack. Other technologies that Ian has expert level experience with include ETL, Python development, cloud architecture and systems automation. Ian holds the Microsoft Certified Solutions Expert (MCSE) for data management and analytics as well as the AWS Solution Architect associate certifications. Ian also writes for multiple technical blogs including SuperFunSQL.com which he founded in 2019.

What's a lab?

Hands-on Labs are real environments created by industry experts to help you learn. These environments help you gain knowledge and experience, practice without compromising your system, test without risk, destroy without fear, and let you learn from your mistakes. Hands-on Labs: practice your skills before delivering in the real world.

Provided environment for hands-on practice

We will provide the credentials and environment necessary for you to practice right within your browser.

Guided walkthrough

Follow along with the author’s guided walkthrough and build something new in your provided environment!

Did you know?

On average, you retain 75% more of your learning if you get time for practice.