• Labs icon Lab
  • Data
Labs

Build an ETL Pipeline with PySpark

In this hands-on Code Lab, you’ll learn how to build an end-to-end ETL pipeline using PySpark — all within a Jupyter Notebook environment. You’ll extract structured and semi-structured data (CSV, JSON, and Parquet), transform it using Spark DataFrame operations, and load it into optimized storage with partitioning and bucketing strategies. Along the way, you’ll apply best practices for schema handling, null value treatment, deduplication, filtering, joins, aggregations, and incremental updates using timestamp filtering. This lab focuses on practical techniques for building performant ETL pipelines that are easy to run, test, and maintain. By the end, you’ll be able to extract data from multiple formats, perform scalable transformations using PySpark, and write out optimized datasets ready for downstream analytics. Whether you're a data engineer, analyst, or developer, this lab gives you hands-on experience with the foundational patterns of building ETL pipelines in PySpark.

Labs

Path Info

Level
Clock icon Intermediate
Duration
Clock icon 45m
Published
Clock icon Apr 08, 2025

Contact sales

By filling out this form and clicking submit, you acknowledge our privacy policy.

Table of Contents

  1. Challenge

    Extracting Data from Multiple Sources

    Step 1: Extracting Data from Multiple Sources

    To get started, open the file on the right titled step1.ipynb. You'll complete each task for Step 1 in that Jupyter Notebook file. Remember, you must run the cells (Control/Command () + Enter) for each task before moving onto the next task in the Jupyter Notebook. Continue until you have completed all tasks in this step. Then when you are ready to move onto the next step, you'll come back and click on the file for the next step until you have completed all tasks.

    Background & Introduction

    In a modern data pipeline, data comes from multiple sources in various formats, including CSV, JSON, and Parquet. Efficiently extracting this data is a crucial first step in building a robust Extract, Transform, and Load (ETL) pipeline.

    In this step, you will focus on extracting structured and unstructured data from different file formats and preparing it for transformation. You will also introduce incremental data extraction, which helps optimize performance by only processing new or updated records instead of reloading entire datasets.

    Imagine you are working for a video streaming platform that collects user activity logs, subscription changes, and content metadata. Your goal is to efficiently load this data into a PySpark DataFrame for further processing.


    Objective:

    Learn how to load structured and unstructured data into PySpark DataFrames.


    Setup Instructions:

    Install Dependencies

    In this environment, Pyspark is already installed for you, but in your local environment, you would need to ensure PySpark is installed with the following command:

    !pip install pyspark
    

    Initialize Spark Session and Reduce Verbose Logging

    # Import necessary libraries
    from pyspark.sql import SparkSession
    import logging
    
    # Initialize Spark session
    spark = SparkSession.builder.appName("VideoStreamingETL").getOrCreate()
    
    # Reduce verbose logging
    spark.sparkContext.setLogLevel("ERROR")
    
    # Configure logging to suppress warnings
    logging.getLogger("py4j").setLevel(logging.ERROR)
    

    Specify Path

    viewing_history_path = "video_streaming_data/viewing_history.csv"
    users_path = "video_streaming_data/users.json"
    videos_catalog_path = "video_streaming_data/videos_catalog.parquet"
    subscription_updates_path = "video_streaming_data/subscription_updates.csv"
    

    Tasks:

    Task 1: Load CSV Data

    • Read viewing_history.csv into a DataFrame.
    🔍 Hint
    • Use spark.read.option("header", True).csv(...) to read CSV files.

    • Make sure the path points to viewing_history.csv inside your Google Drive.

    • Use .printSchema() and .show() to confirm everything loaded properly.

    🔑 Solution
    # Load the CSV file using Spark
    viewing_history_df = spark.read.option("header", True).option("inferSchema", True).csv(viewing_history_path)
    
    # Preview the schema
    viewing_history_df.printSchema()
    
    # Preview the data
    viewing_history_df.show(5)
    

    Task 2: Load JSON Data

    • Read users.json, handling nested structures.
    🔍 Hint
    • If your JSON file is an array wrapped in [...] (which it is), you must enable option("multiline", "true").

    • Use .printSchema() to check that Spark inferred the structure.

    • If everything loads as null, check your path or add a defined schema.

    • Use .show to preview data cleanly.

    🔑 Solution
    # Read the JSON file using multiline mode
    users_df = spark.read.option("multiline", "true").json(users_path)
    
    # Preview the schema
    users_df.printSchema()
    
    # Preview the data
    users_df.show(5)
    

    Task 3: Load Parquet Data

    • Read videos_catalog.parquet and explore schema evolution.
    🔍 Hint - Parquet files store schema with the data, so you don’t need to define one.
    • Just use spark.read.parquet(path) to load it directly.

    • Use .printSchema() and .show() to inspect the contents.

    • This is one of the most optimized formats for big data workloads.

    🔑 Solution
    # Read the Parquet file
    videos_catalog_df = spark.read.parquet(videos_catalog_path)
    
    # Inspect the schema
    videos_catalog_df.printSchema()
    
    # Preview the data
    videos_catalog_df.show(5)
    

    Task 4: Extract Incremental Updates

    • Load subscription_updates.csv and analyze changes.
    🔍 Hint - Read the CSV just like in Task 1 `option("header", True)` and optionally `option("inferSchema", True)`.
    • Check for a column like change_date or updated_at in .printSchema().

    • Use col("change_date") >= "YYYY-MM-DD" inside .filter() to extract recent records.

    • This simulates reading only new data since your last pipeline run.

    🔑 Solution
    from pyspark.sql.functions import col
    
    # Enable header and schema inference:
    subscription_updates_df = spark.read.option("header", True).csv(subscription_updates_path)
    
    # Extract only records from February 7, 2024, onwards
    latest_updates_df = subscription_updates_df.filter(col("change_date") >= "2024-02-07")
    
    # Preview the data
    latest_updates_df.show(5)
    

    Key Concepts Covered:

    • Schema inference vs. user-defined schema
    • Handling different file formats
    • Incremental vs. batch extraction

    Validatation

    Once you've completed all tasks in this step, you have:

    • Loaded data from multiple sources into PySpark
    • Handled schema inference and explored structured vs. unstructured data.
    • Extracted incremental updates for real-time processing
  2. Challenge

    Transforming and Cleaning Data

    Step 2: Transforming and Cleaning Data

    To get started, open the file on the right titled step2.ipynb. You'll complete each task for Step 2 in that Jupyter Notebook file. Remember, you must run the cells (Control/Command () + Enter) for each task before moving onto the next task in the Jupyter Notebook. Continue until you have completed all tasks in this step. Then when you are ready to move onto the next step, you'll come back and click on the file for the next step until you have completed all tasks.

    Transforming and Cleaning Data

    In this step, you will learn how to clean, transform, and optimize data using PySpark. These processes ensure data integrity, consistency, and reliability for downstream analytics.

    Data transformation is a key component of ETL pipelines, ensuring raw data is standardized, deduplicated, enriched, and structured properly. In this step, you will perform missing value handling, deduplication, type conversions, joins, aggregations, and custom transformations.

    Setup Instructions:

    Initialize Spark Session and Reduce Verbose Logging

    # Import necessary libraries
    from pyspark.sql import SparkSession
    import logging
    
    # Initialize Spark session
    spark = SparkSession.builder.appName("VideoStreamingETL").getOrCreate()
    
    # Reduce verbose logging
    spark.sparkContext.setLogLevel("ERROR")
    
    # Configure logging to suppress warnings
    logging.getLogger("py4j").setLevel(logging.ERROR)
    

    Load the Data

    Read the users and viewing history datasets.

    viewing_history_path = "video_streaming_data/viewing_history.csv"
    users_path = "video_streaming_data/users.json"
    
    viewing_history_df = spark.read.option("header", "true").csv(viewing_history_path)
    users_df = spark.read.option("multiline", "true").json(users_path)
    

    Task 1: Handle Missing Values

    • Identify missing values in viewing_history_df and users_df.
    • Apply .fillna() to replace nulls where applicable.
    • Drop records with excessive missing values.
    🔍 Hint
    • Use .fillna() to replace nulls.
    • Use .dropna() to remove records with excessive missing data.
    • Use when(...).otherwise(...) to count nulls before and after cleaning.
    🔑 Solution
    from pyspark.sql.functions import col, sum as _sum, when
    	
    # Get the Null counts in viewing_history
    print(" Null counts in viewing_history_df:")
    viewing_history_df.select([
        _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
        for c in viewing_history_df.columns
    ]).show()
    
    # Get the Null counts in users_df
    print(" Null counts in users_df:")
    users_df.select([
        _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
        for c in users_df.columns
    ]).show()
    
    # Fill missing device_type with "Unknown"
    viewing_history_df = viewing_history_df.fillna({"device_type": "Unknown"})
    
    # Drop account_status column (mostly null)
    if "account_status" in viewing_history_df.columns:
        viewing_history_df = viewing_history_df.drop("account_status")
    
    # Drop rows missing user_id or watched_at (critical fields)
    viewing_history_df = viewing_history_df.dropna(subset=["user_id", "watched_at"])
    
    # Fill preferred_language with "Unknown"
    users_df = users_df.fillna({"preferred_language": "Unknown"})
    
    # Fill missing subscription_date with a placeholder
    users_df = users_df.fillna({"subscription_date": "2020-01-01"})
    
    # Verify by rerunning
    from pyspark.sql.functions import col, sum as _sum, when
    	
    # Get the Null counts in viewing_history
    print(" Null counts in viewing_history_df:")
    viewing_history_df.select([
        _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
        for c in viewing_history_df.columns
    ]).show()
    
    # Get the Null counts in users_df
    print(" Null counts in users_df:")
    users_df.select([
        _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
        for c in users_df.columns
    ]).show()
    

    Task 2: Remove Duplicates

    User data (users_df) may contain duplicate records due to data exports, ingestion errors, or tracking issues. If not removed, these duplicates can cause duplicate rows in your joined dataset and inflate user-level metrics.

    In this task, you will:

    • Identify duplicate user records.
    • Remove duplicates by retaining a single record per user_id.
    🔍 Hint
    • Use .dropDuplicates(["user_id"]) to retain only one record per user.
    • Use .count() before and after cleaning to verify.
    • Focus on the user_id column, as this is the key used in joins.
    🔑 Solution
    # Check for duplicates
    users_total = users_df.count()
    unique_user_ids = users_df.select("user_id").distinct().count()
    
    # Print total rows, distinct rows, and duplicate rows
    print(f"Total user records: {users_total}")
    print(f"Unique user_id count: {unique_user_ids}")
    print(f"Duplicate user rows: {users_total - unique_user_ids}")
    
    # Remove duplicates based on user_id
    users_df = users_df.dropDuplicates(["user_id"])
    
    #  Confirm cleanup
    print(f" Users after deduplication: {users_df.count()} rows")
    

    Task 3: Convert Data Types

    Ensure that critical fields like user_id and watched_at are using the correct types — IntegerType for IDs and TimestampType for datetime fields.

    🔍 Hint
    • Use .cast() to convert user_id to integer.
    • Use to_timestamp() to convert string timestamps into actual timestamp type for watched_at.
    • Always verify with .printSchema() after conversions.
    🔑 Solution
    # Check the current schema
    viewing_history_df.printSchema()
    
    from pyspark.sql.functions import to_timestamp, col
    
    # Convert user_id to IntegerType
    viewing_history_df = viewing_history_df.withColumn("user_id", col("user_id").cast("int"))
    
    # Convert watched_at to TimestampType
    viewing_history_df = viewing_history_df.withColumn("watched_at", to_timestamp(col("watched_at")))
    
    # Confirm the changes
    viewing_history_df.printSchema()
    viewing_history_df.select("user_id", "watched_at").show(5)
    

    Task 4: Join Datasets

    Join viewing_history_df with users_df to enrich viewing records.

    🔍 Hint
    • Use .join() with on="user_id" and how="inner" to merge DataFrames.
    • Ensure both user_id columns are the same type before joining.
    • You can preview with .select(...).show() after the join to verify it worked.
    🔑 Solution
    # Optional: Cast user_id in both dataframes
    viewing_history_df = viewing_history_df.withColumn("user_id", col("user_id").cast("int"))
    users_df = users_df.withColumn("user_id", col("user_id").cast("int"))
    
    # Make sure both DataFrames are clean and have user_id
    viewing_history_df.select("user_id").show(3)
    users_df.select("user_id").show(3)
    
    
    # Join both DataFrames
    full_df = viewing_history_df.join(users_df, on="user_id", how="inner")
    
    # Preview Results 
    full_df.select("user_id", "video_id", "watched_at", "email", "preferred_language").show(5)
    

    Task 5: Aggregation & Window Functions

    Use PySpark to generate key metrics from enriched viewing data:

    • How many times each user watched each video
    • How many total views each user has
    • (Optional) Rank most-watched videos per user
    🔍 Hint
    • Use .groupBy() and .agg(count("*")) for aggregating metrics like watch counts.

    • Use Window.partitionBy(...).orderBy(...) with row_number() to rank within groups.

    • Remember to import Window and row_number from pyspark.sql.window and functions.

    🔑 Solution
    # Count Views per User per Video
    from pyspark.sql.functions import count
    
    # Count how many times each user watched each video
    most_watched_df = full_df.groupBy("user_id", "video_id").agg(
        count("*").alias("watch_count")
    )
    
    # Show most watched videos overall
    most_watched_df.orderBy("watch_count", ascending=False).show(10)
    

    If you get an IllegalStateException error message restart the kernel and try again.

    ## Add Total Views per User
    # Count total watch events per user
    total_views_df = full_df.groupBy("user_id").agg(
        count("*").alias("total_views")
    )
    
    # Show most active users
    total_views_df.orderBy("total_views", ascending=False).show(10)
    
    ### Rank Top 3 Most-Watched Videos per User
    from pyspark.sql.window import Window
    from pyspark.sql.functions import row_number
    
    # Create window spec to rank videos within each user
    window_spec = Window.partitionBy("user_id").orderBy(col("watch_count").desc())
    
    # Add ranking column
    ranked_df = most_watched_df.withColumn("rank", row_number().over(window_spec))
    
    # Filter to get top 3 videos per user
    top_videos_per_user = ranked_df.filter(col("rank") <= 3)
    
    # Show results
    top_videos_per_user.orderBy("user_id", "rank").show(10)
    																											 
    

    Task 6: Apply UDFs & Vectorized UDFs

    Use a User-Defined Function (UDF) to normalize the device_type column — standardizing inconsistent labels like "iPhone 13", "IPHONE", "android-tablet", etc. into clean categories like "iPhone", "Android", "Other".

    🔍 Hint
    • Define a Python function to standardize messy string fields like device_type.
    • Use udf() to register it with PySpark.
    • Apply with .withColumn("new_column", your_udf(col("device_type"))).
    • Preview the result using .select("device_type", "normalized_device").show().
    🔑 Solution
    # Preview device types
    viewing_history_df.select("device_type").distinct().show()
    
    ### Define a Python UDF to clean device labels
    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType
    
    # Define the normalization logic
    def normalize_device(device):
        if device is None:
            return "Unknown"
        device = device.lower()
        if "iphone" in device:
            return "iPhone"
        elif "android" in device:
            return "Android"
        else:
            return "Other"
    
    # Register as a PySpark UDF
    normalize_device_udf = udf(normalize_device, StringType())
    
    ### Apply the UDF to your DataFrame
    # Create a new column with normalized values
    viewing_history_df = viewing_history_df.withColumn(
        "normalized_device", normalize_device_udf(viewing_history_df["device_type"])
    )
    
    ### Inspect the result
    viewing_history_df.select("device_type", "normalized_device").distinct().show()
    

    Validation

    Once you've completed all tasks in this step, you will have:

    • Replaced or removed missing values using .fillna() and .dropna()

    • Removed exact and partial duplicates using .dropDuplicates()

    • Converted user_id to Integer and watched_at to Timestamp for consistency

    • Joined viewing_history_df with users_df to enrich the dataset

    • Calculated watch count metrics using .groupBy().agg()

    • Ranked top watched videos per user using window functions

    • Normalized inconsistent device names using a custom UDF

      This ensures the dataset is ready for further analysis and modeling.

  3. Challenge

    Loading Data Efficiently

    Step 3: Loading Data Efficiently

    To get started, open the file on the right titled Step 3. You'll complete each task for Step 3 in that Jupyter Notebook file. Remember, you must run the cells (Control/Command () + Enter) for each task before moving onto the next task in the Jupyter Notebook. Continue until you have completed all tasks in this step. Then when you are ready to move onto the next step, you'll come back and click on the file for the next step until you have completed all tasks.

    Background & Introduction

    Once data is extracted and transformed, the next step is to store it efficiently for downstream analytics and reporting. Efficient data storage improves:

    • Query Performance – Reduces scan time and improves retrieval speed
    • Scalability – Helps manage large datasets effectively
    • Storage Optimization – Reduces redundancy and improves data compression

    In this step, you will optimize how data is stored in PySpark by:

    • Writing data efficiently to Parquet format
    • Using partitioning to speed up queries
    • Implementing bucketing to improve lookup performance

    Setup Instructions:

    Initialize Spark Session and Reduce verbose logging

    # Import necessary libraries
    from pyspark.sql import SparkSession
    import logging
    
    # Initialize Spark session
    spark = SparkSession.builder.appName("VideoStreamingETL").getOrCreate()
    
    # Reduce verbose logging
    spark.sparkContext.setLogLevel("ERROR")
    
    # Configure logging to suppress warnings
    logging.getLogger("py4j").setLevel(logging.ERROR)
    

    Load the Data

    Read viewing history datasets and create processed data.

    viewing_history_path = "video_streaming_data/viewing_history.csv"
    
    viewing_history = spark.read.option("header", "true").csv(viewing_history_path)
    
    processed_path = "video_streaming_data/processed"
    
    

    Task 1: Write Data to Parquet

    Description:
    Parquet is a columnar storage format optimized for analytics. Writing data in Parquet format:

    • Reduces file size due to built-in compression
    • Speeds up query execution for analytical workloads

    Instructions:
    Save a sample of the viewing_history DataFrame in Parquet format to local disk. This helps you test the write process without overloading your system.

    🔍Hint Use .limit() to reduce the number of rows and .mode("overwrite") to avoid duplicate writes. Avoid partitioning in this task to keep it lightweight.
    🔑 Solution
    # Write data to Parquet format, Limit rows during development to reduce memory pressure in this lab environment
    sample_df = viewing_history.limit(100000)
    
    # Write to disk
    sample_df.write.mode("overwrite").parquet(f"{processed_path}/viewing_history.parquet")
    
    print(" Viewing history successfully written to Parquet.")
    

    Task 2: Implement Partitioning

    Description:
    Partitioning splits data into smaller chunks based on a column (e.g., device_type). This allows Spark to scan only the relevant partitions instead of reading the entire dataset.

    Partitioning:

    • Reduces query time by minimizing the number of files read
    • Improves scalability by distributing data efficiently

    Instructions:
    Write the full viewing_history dataset with partitioning on the device_type column to optimize downstream queries.

    🔍Hint Choose device_type as the partition column because it has low cardinality. This avoids performance issues caused by high-cardinality columns like user_id.
    🔑 Solution
    # Write the sample data with partitioning
    viewing_history.write.partitionBy("device_type").mode("overwrite").parquet(f"{processed_path}/viewing_history_partitioned")
    print(" viewing history data has been successfully written with partitioning!")
    
    # Verify Partitioning by checking folder
    import os
    
    partitioned_path = f"{processed_path}/viewing_history_partitioned"
    print("Partitions created:", os.listdir(partitioned_path))
    
    # Verify partitioned column values
    df_partitioned = spark.read.parquet(partitioned_path)
    df_partitioned.select("device_type").distinct().show()
    

    Task 3: Implement Bucketing

    Description:
    Bucketing distributes data into a fixed number of buckets based on a column, optimizing queries that involve filtering on that column.

    Bucketing:

    • Reduces shuffle operations in queries
    • Optimizes joins and lookups by pre-sorting data within each bucket
    • Speeds up queries when filtering by the bucketed column

    Instructions:

    • Bucket viewing_history by video_id.
    • Use .sortBy("video_id") for faster lookups.
    • Print a success message after the write operation.
    🔍Hint Use .write.bucketBy(bucket_count, "column_name").sortBy("column_name").format("parquet") to create bucketed tables.
    🔑 Solution
    # Bucket data by video_id and sort by video_id
    (
        viewing_history.write.bucketBy(10, "video_id")
        .sortBy("video_id")
        .mode("overwrite")
        .format("parquet")
    		.option("path", f"{processed_path}/viewing_history_bucketed")
        .saveAsTable("viewing_history_bucketed")
    )
    
    print(" Data successfully written with bucketing!")
    

    Verifying That Bucketing Was Applied

    Since Spark stores bucketed tables in its default warehouse (/spark-warehouse), you need to query the table directly from Spark's metastore.


    1. Check If the Table Exists

    # Check if the table exists
    spark.catalog.listTables()
    

    Expected Output:
    The "viewing_history_bucketed" table should appear in the list.


    2. Verify Bucketing in Table Schema

    # Verify bucketing in table schema
    spark.sql("DESCRIBE FORMATTED viewing_history_bucketed").show()
    

    Expected Output:
    The output should include "Num Buckets" with a value of 10, confirming that bucketing is applied.


    3 Read the Table from Spark's Metastore

    Since the table is stored in /spark-warehouse, you read it using Spark SQL:

    # Read the table from Spark's Metastore
    bucketed_df = spark.read.table("viewing_history_bucketed")
    bucketed_df.show(5)
    

    Expected Output:
    The table should load successfully, confirming that bucketing is applied.


    4. Benchmark Query Performance:

    Before Bucketing (Slower Query)

    # Query execution time before bucketing
    import time
    from pyspark.sql.functions import col
    
    start_time = time.time()
    viewing_history.filter(col("video_id") == '56789').show()
    end_time = time.time()
    
    print(f" Query Execution Time (Before Bucketing): {end_time - start_time:.4f} seconds")
    

    After Bucketing (Faster Query)

    # Query execution time after bucketing
    start_time = time.time()
    spark.sql("SELECT * FROM viewing_history_bucketed WHERE video_id = '56789'").show()
    end_time = time.time()
    
    print(f" Query Execution Time (After Bucketing): {end_time - start_time:.4f} seconds")
    

    Expected Outcome:
    The query execution time on the bucketed table should be significantly lower than the non-bucketed version.


    Key Concepts Covered

    Once you've completed all tasks in this step, you should have:

    • Optimized data storage using Parquet
    • Partitioned data for faster query performance
    • Implemented bucketing to improve lookup efficiency

    Summary

    • Partitioning reduces query scan time by storing data in separate folders.
    • Bucketing improves performance for filtering and joins on bucketed columns.
    • Writing to Parquet ensures efficient storage and analytics.

    These techniques significantly optimize an ETL pipeline, making data retrieval faster and more efficient.


Ifedayo is a tech specialist with expertise in Cloud Data Solutions, Artificial intelligence, and Web Development. He loves teaching and watching people learn.

What's a lab?

Hands-on Labs are real environments created by industry experts to help you learn. These environments help you gain knowledge and experience, practice without compromising your system, test without risk, destroy without fear, and let you learn from your mistakes. Hands-on Labs: practice your skills before delivering in the real world.

Provided environment for hands-on practice

We will provide the credentials and environment necessary for you to practice right within your browser.

Guided walkthrough

Follow along with the author’s guided walkthrough and build something new in your provided environment!

Did you know?

On average, you retain 75% more of your learning if you get time for practice.