- Lab
- Data

Build an ETL Pipeline with PySpark
In this hands-on Code Lab, you’ll learn how to build an end-to-end ETL pipeline using PySpark — all within a Jupyter Notebook environment. You’ll extract structured and semi-structured data (CSV, JSON, and Parquet), transform it using Spark DataFrame operations, and load it into optimized storage with partitioning and bucketing strategies. Along the way, you’ll apply best practices for schema handling, null value treatment, deduplication, filtering, joins, aggregations, and incremental updates using timestamp filtering. This lab focuses on practical techniques for building performant ETL pipelines that are easy to run, test, and maintain. By the end, you’ll be able to extract data from multiple formats, perform scalable transformations using PySpark, and write out optimized datasets ready for downstream analytics. Whether you're a data engineer, analyst, or developer, this lab gives you hands-on experience with the foundational patterns of building ETL pipelines in PySpark.

Path Info
Table of Contents
-
Challenge
Extracting Data from Multiple Sources
Step 1: Extracting Data from Multiple Sources
To get started, open the file on the right titled
step1.ipynb
. You'll complete each task for Step 1 in that Jupyter Notebook file. Remember, you must run the cells (Control
/Command
(⌘
) +Enter
) for each task before moving onto the next task in the Jupyter Notebook. Continue until you have completed all tasks in this step. Then when you are ready to move onto the next step, you'll come back and click on the file for the next step until you have completed all tasks.Background & Introduction
In a modern data pipeline, data comes from multiple sources in various formats, including CSV, JSON, and Parquet. Efficiently extracting this data is a crucial first step in building a robust Extract, Transform, and Load (ETL) pipeline.
In this step, you will focus on extracting structured and unstructured data from different file formats and preparing it for transformation. You will also introduce incremental data extraction, which helps optimize performance by only processing new or updated records instead of reloading entire datasets.
Imagine you are working for a video streaming platform that collects user activity logs, subscription changes, and content metadata. Your goal is to efficiently load this data into a PySpark DataFrame for further processing.
Objective:
Learn how to load structured and unstructured data into PySpark DataFrames.
Setup Instructions:
Install Dependencies
In this environment, Pyspark is already installed for you, but in your local environment, you would need to ensure PySpark is installed with the following command:
!pip install pyspark
Initialize Spark Session and Reduce Verbose Logging
# Import necessary libraries from pyspark.sql import SparkSession import logging # Initialize Spark session spark = SparkSession.builder.appName("VideoStreamingETL").getOrCreate() # Reduce verbose logging spark.sparkContext.setLogLevel("ERROR") # Configure logging to suppress warnings logging.getLogger("py4j").setLevel(logging.ERROR)
Specify Path
viewing_history_path = "video_streaming_data/viewing_history.csv" users_path = "video_streaming_data/users.json" videos_catalog_path = "video_streaming_data/videos_catalog.parquet" subscription_updates_path = "video_streaming_data/subscription_updates.csv"
Tasks:
Task 1: Load CSV Data
- Read
viewing_history.csv
into a DataFrame.
🔍 Hint
-
Use
spark.read.option("header", True).csv(...)
to read CSV files. -
Make sure the path points to
viewing_history.csv
inside your Google Drive. -
Use
.printSchema()
and.show()
to confirm everything loaded properly.
🔑 Solution
# Load the CSV file using Spark viewing_history_df = spark.read.option("header", True).option("inferSchema", True).csv(viewing_history_path)
# Preview the schema viewing_history_df.printSchema()
# Preview the data viewing_history_df.show(5)
Task 2: Load JSON Data
- Read
users.json
, handling nested structures.
🔍 Hint
-
If your JSON file is an array wrapped in
[...]
(which it is), you must enableoption("multiline", "true")
. -
Use
.printSchema()
to check that Spark inferred the structure. -
If everything loads as null, check your path or add a defined schema.
-
Use
.show
to preview data cleanly.
🔑 Solution
# Read the JSON file using multiline mode users_df = spark.read.option("multiline", "true").json(users_path)
# Preview the schema users_df.printSchema()
# Preview the data users_df.show(5)
Task 3: Load Parquet Data
- Read
videos_catalog.parquet
and explore schema evolution.
🔍 Hint
- Parquet files store schema with the data, so you don’t need to define one.-
Just use
spark.read.parquet(path)
to load it directly. -
Use
.printSchema()
and.show()
to inspect the contents. -
This is one of the most optimized formats for big data workloads.
🔑 Solution
# Read the Parquet file videos_catalog_df = spark.read.parquet(videos_catalog_path)
# Inspect the schema videos_catalog_df.printSchema()
# Preview the data videos_catalog_df.show(5)
Task 4: Extract Incremental Updates
- Load
subscription_updates.csv
and analyze changes.
🔍 Hint
- Read the CSV just like in Task 1 `option("header", True)` and optionally `option("inferSchema", True)`.-
Check for a column like
change_date
orupdated_at
in.printSchema()
. -
Use
col("change_date") >= "YYYY-MM-DD"
inside.filter()
to extract recent records. -
This simulates reading only new data since your last pipeline run.
🔑 Solution
from pyspark.sql.functions import col # Enable header and schema inference: subscription_updates_df = spark.read.option("header", True).csv(subscription_updates_path) # Extract only records from February 7, 2024, onwards latest_updates_df = subscription_updates_df.filter(col("change_date") >= "2024-02-07")
# Preview the data latest_updates_df.show(5)
Key Concepts Covered:
- Schema inference vs. user-defined schema
- Handling different file formats
- Incremental vs. batch extraction
Validatation
Once you've completed all tasks in this step, you have:
- Loaded data from multiple sources into PySpark
- Handled schema inference and explored structured vs. unstructured data.
- Extracted incremental updates for real-time processing
- Read
-
Challenge
Transforming and Cleaning Data
Step 2: Transforming and Cleaning Data
To get started, open the file on the right titled
step2.ipynb
. You'll complete each task for Step 2 in that Jupyter Notebook file. Remember, you must run the cells (Control
/Command
(⌘
) +Enter
) for each task before moving onto the next task in the Jupyter Notebook. Continue until you have completed all tasks in this step. Then when you are ready to move onto the next step, you'll come back and click on the file for the next step until you have completed all tasks.Transforming and Cleaning Data
In this step, you will learn how to clean, transform, and optimize data using PySpark. These processes ensure data integrity, consistency, and reliability for downstream analytics.
Data transformation is a key component of ETL pipelines, ensuring raw data is standardized, deduplicated, enriched, and structured properly. In this step, you will perform missing value handling, deduplication, type conversions, joins, aggregations, and custom transformations.
Setup Instructions:
Initialize Spark Session and Reduce Verbose Logging
# Import necessary libraries from pyspark.sql import SparkSession import logging # Initialize Spark session spark = SparkSession.builder.appName("VideoStreamingETL").getOrCreate() # Reduce verbose logging spark.sparkContext.setLogLevel("ERROR") # Configure logging to suppress warnings logging.getLogger("py4j").setLevel(logging.ERROR)
Load the Data
Read the users and viewing history datasets.
viewing_history_path = "video_streaming_data/viewing_history.csv" users_path = "video_streaming_data/users.json" viewing_history_df = spark.read.option("header", "true").csv(viewing_history_path) users_df = spark.read.option("multiline", "true").json(users_path)
Task 1: Handle Missing Values
- Identify missing values in
viewing_history_df
andusers_df
. - Apply
.fillna()
to replace nulls where applicable. - Drop records with excessive missing values.
🔍 Hint
- Use
.fillna()
to replace nulls. - Use
.dropna()
to remove records with excessive missing data. - Use
when(...).otherwise(...)
to count nulls before and after cleaning.
🔑 Solution
from pyspark.sql.functions import col, sum as _sum, when # Get the Null counts in viewing_history print(" Null counts in viewing_history_df:") viewing_history_df.select([ _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in viewing_history_df.columns ]).show() # Get the Null counts in users_df print(" Null counts in users_df:") users_df.select([ _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in users_df.columns ]).show()
# Fill missing device_type with "Unknown" viewing_history_df = viewing_history_df.fillna({"device_type": "Unknown"}) # Drop account_status column (mostly null) if "account_status" in viewing_history_df.columns: viewing_history_df = viewing_history_df.drop("account_status") # Drop rows missing user_id or watched_at (critical fields) viewing_history_df = viewing_history_df.dropna(subset=["user_id", "watched_at"]) # Fill preferred_language with "Unknown" users_df = users_df.fillna({"preferred_language": "Unknown"}) # Fill missing subscription_date with a placeholder users_df = users_df.fillna({"subscription_date": "2020-01-01"})
# Verify by rerunning from pyspark.sql.functions import col, sum as _sum, when # Get the Null counts in viewing_history print(" Null counts in viewing_history_df:") viewing_history_df.select([ _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in viewing_history_df.columns ]).show() # Get the Null counts in users_df print(" Null counts in users_df:") users_df.select([ _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in users_df.columns ]).show()
Task 2: Remove Duplicates
User data (
users_df
) may contain duplicate records due to data exports, ingestion errors, or tracking issues. If not removed, these duplicates can cause duplicate rows in your joined dataset and inflate user-level metrics.In this task, you will:
- Identify duplicate user records.
- Remove duplicates by retaining a single record per
user_id
.
🔍 Hint
- Use
.dropDuplicates(["user_id"])
to retain only one record per user. - Use
.count()
before and after cleaning to verify. - Focus on the
user_id
column, as this is the key used in joins.
🔑 Solution
# Check for duplicates users_total = users_df.count() unique_user_ids = users_df.select("user_id").distinct().count() # Print total rows, distinct rows, and duplicate rows print(f"Total user records: {users_total}") print(f"Unique user_id count: {unique_user_ids}") print(f"Duplicate user rows: {users_total - unique_user_ids}")
# Remove duplicates based on user_id users_df = users_df.dropDuplicates(["user_id"])
# Confirm cleanup print(f" Users after deduplication: {users_df.count()} rows")
Task 3: Convert Data Types
Ensure that critical fields like
user_id
andwatched_at
are using the correct types —IntegerType
for IDs andTimestampType
for datetime fields.🔍 Hint
- Use
.cast()
to convertuser_id
to integer. - Use
to_timestamp()
to convert string timestamps into actual timestamp type forwatched_at
. - Always verify with
.printSchema()
after conversions.
🔑 Solution
# Check the current schema viewing_history_df.printSchema()
from pyspark.sql.functions import to_timestamp, col # Convert user_id to IntegerType viewing_history_df = viewing_history_df.withColumn("user_id", col("user_id").cast("int")) # Convert watched_at to TimestampType viewing_history_df = viewing_history_df.withColumn("watched_at", to_timestamp(col("watched_at")))
# Confirm the changes viewing_history_df.printSchema() viewing_history_df.select("user_id", "watched_at").show(5)
Task 4: Join Datasets
Join
viewing_history_df
withusers_df
to enrich viewing records.🔍 Hint
- Use
.join()
withon="user_id"
andhow="inner"
to merge DataFrames. - Ensure both
user_id
columns are the same type before joining. - You can preview with
.select(...).show()
after the join to verify it worked.
🔑 Solution
# Optional: Cast user_id in both dataframes viewing_history_df = viewing_history_df.withColumn("user_id", col("user_id").cast("int")) users_df = users_df.withColumn("user_id", col("user_id").cast("int"))
# Make sure both DataFrames are clean and have user_id viewing_history_df.select("user_id").show(3) users_df.select("user_id").show(3)
# Join both DataFrames full_df = viewing_history_df.join(users_df, on="user_id", how="inner")
# Preview Results full_df.select("user_id", "video_id", "watched_at", "email", "preferred_language").show(5)
Task 5: Aggregation & Window Functions
Use PySpark to generate key metrics from enriched viewing data:
- How many times each user watched each video
- How many total views each user has
- (Optional) Rank most-watched videos per user
🔍 Hint
-
Use
.groupBy()
and.agg(count("*"))
for aggregating metrics like watch counts. -
Use
Window.partitionBy(...).orderBy(...)
withrow_number()
to rank within groups. -
Remember to
import Window
androw_number
frompyspark.sql.window
andfunctions
.
🔑 Solution
# Count Views per User per Video from pyspark.sql.functions import count # Count how many times each user watched each video most_watched_df = full_df.groupBy("user_id", "video_id").agg( count("*").alias("watch_count") ) # Show most watched videos overall most_watched_df.orderBy("watch_count", ascending=False).show(10)
If you get an
IllegalStateException
error message restart the kernel and try again.## Add Total Views per User # Count total watch events per user total_views_df = full_df.groupBy("user_id").agg( count("*").alias("total_views") ) # Show most active users total_views_df.orderBy("total_views", ascending=False).show(10)
### Rank Top 3 Most-Watched Videos per User from pyspark.sql.window import Window from pyspark.sql.functions import row_number # Create window spec to rank videos within each user window_spec = Window.partitionBy("user_id").orderBy(col("watch_count").desc()) # Add ranking column ranked_df = most_watched_df.withColumn("rank", row_number().over(window_spec)) # Filter to get top 3 videos per user top_videos_per_user = ranked_df.filter(col("rank") <= 3) # Show results top_videos_per_user.orderBy("user_id", "rank").show(10)
Task 6: Apply UDFs & Vectorized UDFs
Use a User-Defined Function (UDF) to normalize the
device_type
column — standardizing inconsistent labels like"iPhone 13"
,"IPHONE"
,"android-tablet"
, etc. into clean categories like"iPhone"
,"Android"
,"Other"
.🔍 Hint
- Define a Python function to standardize messy string fields like
device_type
. - Use
udf()
to register it with PySpark. - Apply with
.withColumn("new_column", your_udf(col("device_type")))
. - Preview the result using
.select("device_type", "normalized_device").show()
.
🔑 Solution
# Preview device types viewing_history_df.select("device_type").distinct().show()
### Define a Python UDF to clean device labels from pyspark.sql.functions import udf from pyspark.sql.types import StringType # Define the normalization logic def normalize_device(device): if device is None: return "Unknown" device = device.lower() if "iphone" in device: return "iPhone" elif "android" in device: return "Android" else: return "Other" # Register as a PySpark UDF normalize_device_udf = udf(normalize_device, StringType())
### Apply the UDF to your DataFrame # Create a new column with normalized values viewing_history_df = viewing_history_df.withColumn( "normalized_device", normalize_device_udf(viewing_history_df["device_type"]) )
### Inspect the result viewing_history_df.select("device_type", "normalized_device").distinct().show()
Validation
Once you've completed all tasks in this step, you will have:
-
Replaced or removed missing values using
.fillna()
and.dropna()
-
Removed exact and partial duplicates using
.dropDuplicates()
-
Converted
user_id
to Integer andwatched_at
to Timestamp for consistency -
Joined
viewing_history_df
withusers_df
to enrich the dataset -
Calculated watch count metrics using
.groupBy().agg()
-
Ranked top watched videos per user using window functions
-
Normalized inconsistent device names using a custom UDF
This ensures the dataset is ready for further analysis and modeling.
- Identify missing values in
-
Challenge
Loading Data Efficiently
Step 3: Loading Data Efficiently
To get started, open the file on the right titled
Step 3
. You'll complete each task for Step 3 in that Jupyter Notebook file. Remember, you must run the cells (Control
/Command
(⌘
) +Enter
) for each task before moving onto the next task in the Jupyter Notebook. Continue until you have completed all tasks in this step. Then when you are ready to move onto the next step, you'll come back and click on the file for the next step until you have completed all tasks.Background & Introduction
Once data is extracted and transformed, the next step is to store it efficiently for downstream analytics and reporting. Efficient data storage improves:
- Query Performance – Reduces scan time and improves retrieval speed
- Scalability – Helps manage large datasets effectively
- Storage Optimization – Reduces redundancy and improves data compression
In this step, you will optimize how data is stored in PySpark by:
- Writing data efficiently to Parquet format
- Using partitioning to speed up queries
- Implementing bucketing to improve lookup performance
Setup Instructions:
Initialize Spark Session and Reduce verbose logging
# Import necessary libraries from pyspark.sql import SparkSession import logging # Initialize Spark session spark = SparkSession.builder.appName("VideoStreamingETL").getOrCreate() # Reduce verbose logging spark.sparkContext.setLogLevel("ERROR") # Configure logging to suppress warnings logging.getLogger("py4j").setLevel(logging.ERROR)
Load the Data
Read viewing history datasets and create processed data.
viewing_history_path = "video_streaming_data/viewing_history.csv" viewing_history = spark.read.option("header", "true").csv(viewing_history_path) processed_path = "video_streaming_data/processed"
Task 1: Write Data to Parquet
Description:
Parquet is a columnar storage format optimized for analytics. Writing data in Parquet format:- Reduces file size due to built-in compression
- Speeds up query execution for analytical workloads
Instructions:
Save a sample of theviewing_history
DataFrame in Parquet format to local disk. This helps you test the write process without overloading your system.🔍Hint
Use.limit()
to reduce the number of rows and.mode("overwrite")
to avoid duplicate writes. Avoid partitioning in this task to keep it lightweight.🔑 Solution
# Write data to Parquet format, Limit rows during development to reduce memory pressure in this lab environment sample_df = viewing_history.limit(100000) # Write to disk sample_df.write.mode("overwrite").parquet(f"{processed_path}/viewing_history.parquet") print(" Viewing history successfully written to Parquet.")
Task 2: Implement Partitioning
Description:
Partitioning splits data into smaller chunks based on a column (e.g.,device_type
). This allows Spark to scan only the relevant partitions instead of reading the entire dataset.Partitioning:
- Reduces query time by minimizing the number of files read
- Improves scalability by distributing data efficiently
Instructions:
Write the fullviewing_history
dataset with partitioning on thedevice_type
column to optimize downstream queries.🔍Hint
Choosedevice_type
as the partition column because it has low cardinality. This avoids performance issues caused by high-cardinality columns likeuser_id
.🔑 Solution
# Write the sample data with partitioning viewing_history.write.partitionBy("device_type").mode("overwrite").parquet(f"{processed_path}/viewing_history_partitioned") print(" viewing history data has been successfully written with partitioning!")
# Verify Partitioning by checking folder import os partitioned_path = f"{processed_path}/viewing_history_partitioned" print("Partitions created:", os.listdir(partitioned_path))
# Verify partitioned column values df_partitioned = spark.read.parquet(partitioned_path) df_partitioned.select("device_type").distinct().show()
Task 3: Implement Bucketing
Description:
Bucketing distributes data into a fixed number of buckets based on a column, optimizing queries that involve filtering on that column.Bucketing:
- Reduces shuffle operations in queries
- Optimizes joins and lookups by pre-sorting data within each bucket
- Speeds up queries when filtering by the bucketed column
Instructions:
- Bucket
viewing_history
byvideo_id
. - Use
.sortBy("video_id")
for faster lookups. - Print a success message after the write operation.
🔍Hint
Use.write.bucketBy(bucket_count, "column_name").sortBy("column_name").format("parquet")
to create bucketed tables.🔑 Solution
# Bucket data by video_id and sort by video_id ( viewing_history.write.bucketBy(10, "video_id") .sortBy("video_id") .mode("overwrite") .format("parquet") .option("path", f"{processed_path}/viewing_history_bucketed") .saveAsTable("viewing_history_bucketed") ) print(" Data successfully written with bucketing!")
Verifying That Bucketing Was Applied
Since Spark stores bucketed tables in its default warehouse (
/spark-warehouse
), you need to query the table directly from Spark's metastore.
1. Check If the Table Exists
# Check if the table exists spark.catalog.listTables()
Expected Output:
The"viewing_history_bucketed"
table should appear in the list.
2. Verify Bucketing in Table Schema
# Verify bucketing in table schema spark.sql("DESCRIBE FORMATTED viewing_history_bucketed").show()
Expected Output:
The output should include "Num Buckets" with a value of 10, confirming that bucketing is applied.
3 Read the Table from Spark's Metastore
Since the table is stored in
/spark-warehouse
, you read it using Spark SQL:# Read the table from Spark's Metastore bucketed_df = spark.read.table("viewing_history_bucketed") bucketed_df.show(5)
Expected Output:
The table should load successfully, confirming that bucketing is applied.
4. Benchmark Query Performance:
Before Bucketing (Slower Query)
# Query execution time before bucketing import time from pyspark.sql.functions import col start_time = time.time() viewing_history.filter(col("video_id") == '56789').show() end_time = time.time() print(f" Query Execution Time (Before Bucketing): {end_time - start_time:.4f} seconds")
After Bucketing (Faster Query)
# Query execution time after bucketing start_time = time.time() spark.sql("SELECT * FROM viewing_history_bucketed WHERE video_id = '56789'").show() end_time = time.time() print(f" Query Execution Time (After Bucketing): {end_time - start_time:.4f} seconds")
Expected Outcome:
The query execution time on the bucketed table should be significantly lower than the non-bucketed version.
Key Concepts Covered
Once you've completed all tasks in this step, you should have:
- Optimized data storage using Parquet
- Partitioned data for faster query performance
- Implemented bucketing to improve lookup efficiency
Summary
- Partitioning reduces query scan time by storing data in separate folders.
- Bucketing improves performance for filtering and joins on bucketed columns.
- Writing to Parquet ensures efficient storage and analytics.
These techniques significantly optimize an ETL pipeline, making data retrieval faster and more efficient.
What's a lab?
Hands-on Labs are real environments created by industry experts to help you learn. These environments help you gain knowledge and experience, practice without compromising your system, test without risk, destroy without fear, and let you learn from your mistakes. Hands-on Labs: practice your skills before delivering in the real world.
Provided environment for hands-on practice
We will provide the credentials and environment necessary for you to practice right within your browser.
Guided walkthrough
Follow along with the author’s guided walkthrough and build something new in your provided environment!
Did you know?
On average, you retain 75% more of your learning if you get time for practice.