Hamburger Icon

Building a serverless Kafka data pipeline

Learn how to set up a serverless Kafka cluster using Amazon MSK Serverless, and how to create serverless producers and consumers.

Jun 08, 2023 • 10 Minute Read

Please set an alt value for this image...

Serverless services allow us to build applications without having to worry about the underlying infrastructure. When it comes to Apache Kafka, this allows developers to avoid provisioning, scaling, and managing resource utilization of their clusters. In this post, I walk through setting up a serverless Kafka cluster using Amazon MSK Serverless and show how to create serverless producers and consumers.

Accelerate your career

Get started with ACG and transform your career with courses and real hands-on labs in AWS, Microsoft Azure, Google Cloud, and beyond.

How to start a serverless Kafka cluster

Apache Kafka is a distributed event store and stream-processing platform. Because it’s highly scalable and resilient, it provides a mechanism to decouple data processing from the source and target destinations. 

Kafka is very customizable based on your workload, which introduces operational overhead. With serverless Kafka, the tradeoff is that you lose the flexibility of being able to configure the capacity of your cluster, but you gain the ability to use Kafka through a single interface that just provides an endpoint for clients.

To get started on AWS, you can create a cluster from the Amazon MSK console page.


With Serverless, you don't have to worry about configuration. You can simply continue and have a cluster set up in minutes.


The default options of 200 Mib/s for ingress and 400 Mib/s are great for just getting started with Kafka workloads when you don't know how much data is needed to process. If your workload grows in size, and the serverless cluster is unable to keep up with the default write/read throughput, you can explore using the Managed AWS Kafka cluster that provides the flexibility to tune the cluster for your workload.

Once the cluster is up, you can view the API endpoint needed for the clients in the properties tab in the console.


For more information on setting up the cluster check out the official documentation.

Creating serverless Kafka clients

Once the cluster is up, we can create Kafka clients to send data to the cluster and consume from it. In this section I will explain how to:

  • Set up the required IAM permissions for our Lambda function
  • Build a Docker image for the Lambda function
  • Configure the Lambda function to communicate with the serverless Kafka cluster

Setting permissions

Currently, IAM-based authentication is required to communicate with the cluster. The following is an example policy that can be used for the clients. Be sure to replace REGION, Account-ID, and CLUSTER_NAME with your values.

   "Version": "2012-10-17",
   "Statement": [
           "Effect": "Allow",
           "Action": [
           "Resource": [
           "Effect": "Allow",
           "Action": [
 "Resource": [
           "Effect": "Allow",
           "Action": [
           "Resource": [

Next, we can create a new role for the Lambda function and attach the policy above to the role.


Additionally, the role will need to have the AWSLambdaVPCAccessExecutionRole policy, as the function needs to be deployed in the same VPC as the serverless cluster.


Building a Docker image

For our Lambda function, we will use a custom container that has all the prerequisites needed to connect to the Kafka cluster. Below is an example Dockerfile that can be used for building a Kafka client Lambda function. Check out the resources section at the bottom for the and files.

# Lambda base image
# Install Kafka prereqs
RUN yum -y install java-11 wget tar
RUN wget
RUN tar -xzf kafka_2.12-2.8.1.tgz
RUN wget
RUN mv aws-msk-iam-auth-1.1.1-all.jar ${LAMBDA_TASK_ROOT}/kafka_2.12-2.8.1/libs
# Remove tgz
RUN rm kafka_2.12-2.8.1.tgz
# Lambda code
# Run handler
CMD ["handler.action"]

Follow the instructions here to push your image to Amazon Elastic Container Registry (ECR). Once your image is in ECR, you can create a Lambda Function using the container image. We'll need to update some settings once the function is created.

Configuring the Lambda function

Finally we need to configure the Lambda function so it can interact with the cluster. For the basic settings we need to increase the memory to 1024 MB, timeout to 1 min, and use the IAM role we created above.


Next we need to make sure the Lambda function has the same VPC, subnets, and security group as the MSK Cluster.


Finally we need to add the KAFKA_ENDPOINT environment variable set as the MSK cluster endpoint from the “View client information” button.


With that, we are ready to begin producing and consuming data.

Producing and consuming data

With Lambda we can send test events through the “Test” tab in the console to verify that our container image is working. To test the produce function we can send this simple payload to push data to my_topic.

 "action": "produce",
 "topic": "my_topic",

The producer code simply invokes the producer script with the passed in parameters, using sample data.

--topic {topic}
--bootstrap-server {KAFKA_ENDPOINT}
--producer.config < /tmp/test.json

Once the code runs the metrics from the MSK cluster, it will take a few minutes to update and indicate that data was received. The metrics can be viewed from the metric tabs on the cluster main page.


We can also then test the consumer code by updating the payload.

 "action": "consume",
 "topic": "my_topic",

Similarly, the consumer code invokes the consumer script with the passed in parameters and writes the output to a sample file. Depending on your use case, the file can be uploaded to S3 or have other functions run on the data. The timeout parameter is there to ensure the script shutdowns, or it would stay up waiting for input.

--topic {topic} --from-beginning
--bootstrap-server {KAFKA_ENDPOINT}
--consumer.config --timeout-ms 12000 > /tmp/output.json"

All of the code is configurable to fit your use case, so feel free to use this as a starter guide and adapt as needed.


In this post, we discussed how to successfully set up the infrastructure needed to begin scaling out a serverless Kafka pipeline by:

  • Starting a serverless MSK cluster
  • Creating a Kafka client Docker image
  • Deploying a container-based Lambda function
  • Producing and consuming data through the Lambda console

This solution can be extended to fit different use cases, such as reading data from a database on periodic intervals using Eventbridge or uploading processed messages to S3. The flexibility of Lambda paired with Kafka’s ability to decouple data processing from the source and target destinations provides the foundation for a serverless data pipeline.

Follow Banjo on Twitter at @banjtheman and @AWSDevelopers for more useful tips and tricks about the cloud in general and AWS.

About the author

Banjo is a Senior Developer Advocate at AWS, where he helps builders get excited about using AWS. Banjo is passionate about operationalizing data and has started a podcast, a meetup, and open-source projects around utilizing data. When not building the next big thing, Banjo likes to relax by playing video games especially JRPGs and exploring events happening around him


sasl.mechanism=AWS_MSK_IAM required;

This code provides a template for Lambda to interact with the serverless Kafka cluster. Due to the IAM-based authentication the standard Kafka Python library could not interact with the cluster, so I developed a wrapper around the Java scripts to interact with the cluster.

import json
import logging
import os
import subprocess
from typing import List, Dict, Any


def action(event, context) -> Dict[str, Any]:
       Entrypoint for action on cluster
       event - data from lambda
       context - context data from lambda
       response - JSON response

   body = {
       "message": "Running Action",
       "input": event,

   curr_action = event["action"]
   topic = event["topic"]

   # If creating topic, can specify number of partitions
   if "num_partitions" in event:
       num_partitions = event["num_partitions"]
       num_partitions = 1

   if curr_action == "produce":
       response = produce(topic, num_partitions)
   elif curr_action == "consume":
       response = consume(topic)
       raise ValueError("Invalid action")

   return response

def consume(topic: str) -> Dict[str, Any]:
       Consume data in a topic
       topic - topic to consume on
       response - JSON response
   body = {
       "message": "Data consumed!!!",

   # TODO can play with the timeout, on how long you want to collect data
   # TODO can also configure if you want to get data from the beginning or not
   cmd = f"./kafka_2.12-2.8.1/bin/ --topic {topic} --from-beginning --bootstrap-server {KAFKA_ENDPOINT} --consumer.config --timeout-ms 12000 > /tmp/output.json"


   # TODO
   # Do what you need to do with output.json i.e upload to s3, run analytics, etc..

   response = {"statusCode": 200, "body": json.dumps(body)}

   return response

def produce(topic: str, num_partitions: int) -> Dict[str, Any]:
       Produce data in a topic
       topic - topic to create
       num_partitions - number of num_partitions to use
       response - JSON response
   # TODO would input your process to get data to send to topic
   sample_data = '{"user":"Alice","number":105,"timestampInEpoch":1650880895}n{"user":"Bob","number":5,"timestampInEpoch":1650880324}'

   # Write sample output to temp file
   write_to_file("/tmp/test.json", sample_data)

   body = {
       "message": "Data produced!!!",
       "input": sample_data,

   # Check if topic exists, if not create it
   topics = list_topics()
   if not topic in topics:
       if not create_topic(topic, num_partitions):
           raise RuntimeError("Topic not created")

   produce_topic_command = f"./kafka_2.12-2.8.1/bin/  --topic {topic} --bootstrap-server {KAFKA_ENDPOINT} --producer.config < /tmp/test.json"


   response = {"statusCode": 200, "body": json.dumps(body)}

   return response

def create_topic(topic:str, num_partitions:int) -> bool:
       Create topic in cluster
       topic - topic to create
       num_partitions - number of num_partitions to use
       bool - True if created, false if not
   cmd = f"./kafka_2.12-2.8.1/bin/ --bootstrap-server {KAFKA_ENDPOINT} --command-config --create --topic {topic} --partitions {num_partitions}"

   output = subprocess.check_output(cmd, shell=True)
   output_string = output.decode("utf-8")
   outputs = output_string.split("n")

   # Check if created
   success_string = f"Created topic {topic}."
   if success_string in outputs:
       return True
       return False

def list_topics() -> List[str]:
       List topics in cluster
       topics - list of topics
   cmd = f"./kafka_2.12-2.8.1/bin/ --list --bootstrap-server {KAFKA_ENDPOINT} --command-config"

   # Run the command to get list of topics
   output = subprocess.check_output(cmd, shell=True)
   output_string = output.decode("utf-8")
   topics = output_string.split("n")  # turn output to array

   return topics

def test_produce_consume() -> None:
       Test producing and consuming
   """"testing produce")
   response = produce("my_topic", 1)"testing consume")
   consume("my_topic")"Done and Done")

def write_to_file(file_path: str, file_text: str) -> bool:
       Write text from a file
        file_path: file path
        file_text: Text of file
       Status: True if written, False if failed

       with open(file_path, "w") as myfile:
           return True

   except Exception as error:
       return False

if __name__ == "__main__":
   loglevel = logging.INFO
   logging.basicConfig(format="%(levelname)s: %(message)s", level=loglevel)