Crossing the streams with Azure Event Hubs and Stream Analytics
Real-time data poses challenges for traditional architectures, like joining streams of data. Here's how to join streams with Azure Event Hubs and Azure Stream Analytics.
Jun 08, 2023 • 12 Minute Read
This blog provides a practical example of how to use Azure Stream Analytics to process streaming data from Azure Event Hubs. You should be able to go through this tutorial using the Azure Portal (or Azure CLI), without writing any code. There are also other resources for exploring stream processing with Azure Stream Analytics at the end of this blog post.
What’s covered?
- A quick overview of the use case, solution, and its constituents
- How to setup the required Azure services: Event Hubs, Stream Analytics and Blob storage
- How to configure and test your Azure Stream Analytics Job with sample data
- How to run your Azure Stream Analytics Job and test it with real-time data
Want to learn more about Azure certifications?
Check out our Azure Certifications and Learning Paths.
Overview
Azure Stream Analytics is a real-time analytics and complex event-processing engine designed to analyze and process high volumes of fast-streaming data from multiple sources simultaneously. It supports the notion of a Job
, each of which consists of an input
, query
, and an output
. Azure Stream Analytics can ingest
data from Azure Event Hubs (including Azure Event Hubs from Apache Kafka), Azure IoT Hub, or Azure Blob Storage. The query
, which is based on SQL query language, can be used to easily filter, sort, aggregate, and join streaming data over a period of time.
Assume you have an application that accepts processed orders from customers and sends them to Azure Event Hubs. The requirement is to process the "raw" orders data and enrich it with additional customer info such as name, email, location etc. To get this done, you can build a downstream service that will consume these orders from Event Hubs and process them. In this example, this service happens to be an Azure Stream Analytics job (which we’ll explore later of course!)
You can use Kafka to handle messaging as part of an infrastructure. Want to learn more about producing and consuming messages using simple HTTP requests? Try our hands-on labs to get experience with the requests necessary for consuming Kafka data using REST Proxy.
In order to build this app, we would need to fetch this customer data from an external system (for example, a database) and for each customer ID in the order info, we would query this for the customer details. This will suffice for systems with low-velocity data or where end-to-end processing latency isn’t a concern. But it will pose a challenge for real-time processing on high-velocity streaming data.
Of course, this is not a novel problem! The purpose of this blog post is to showcase how you can use Azure Stream Analytics to implement a solution. Here are the individual components:
Input data source
Azure Stream Analytics jobs connect to one or more data inputs. Each input defines a connection to an existing data source — in this case, its Azure Event Hubs.
An individual order is a JSON payload that looks like this:
{ "id": "42", "custid": "4", "amount": "100" }
Reference data
Customer information is provided as reference data. Although, the customer information is likely to change (e.g., if the customer changes her phone number), for the purposes of this example, we’ll treat it is static
reference data stored in Azure Blob Storage container.
Query
This is the workhorse of our solution! It joins (a continuous stream of) orders data from Azure Event Hubs with the static reference customers data based on the matching customer ID (which is id
in the customers
data set and id
in the orders
stream)
Output sink
Simply put, an Output lets you store and save the results of the Stream Analytics job. In this example, to keep things simple we continue to use Azure Event Hubs (a different topic) as the output.
Now that you have a conceptual overview, it's time to dive in. All you need is an Azure account. If you don't have it already, just grab one for free.
Initial setup
In this section, you'll:
- Create Azure Event Hubs namespace and topic
- Create Azure Blob Storage account and container
- Create Azure Stream Analytics Job and configure Event Hubs and Blob Storage inputs for the job
Azure Event Hubs
You need to create an Event Hubs Namespace and Hub (topic). There are lots of options including Azure Portal, Azure CLI, ARM template or Azure PowerShell
Azure Resource Manager (ARM) templates provide a powerful way to define Azure resources and configuration using a text file. Use our hands-on labs to learn how to locate and leverage Microsoft’s public Azure quickstart templates.
Please note that you need to create two topics:
- Input (you can name this
orders
): Azure Stream Analytics will use this as a (streaming) "source" for orders data - Output (you can name this
customer-orders
): Azure Stream Analytics will use this as a "sink" to store the enriched data as processed by the query
Azure Blob Storage
You’ll need to create an Azure Storage account. This quickstart walks you through this process and provides guidance for Azure Portal, Azure CLI, etc. Once that's done, go ahead and create a container using the Azure Portal or the Azure CLI if you prefer.
Save the JSON below to a file and upload it to the storage container you just created.
[ { "id": "1", "name": "Willis Collins", "location": "Dallas" }, { "id": "2", "name": "Casey Brady", "location": "Chicago" }, { "id": "3", "name": "Walker Wong", "location": "San Jose" }, { "id": "4", "name": "Randall Weeks", "location": "San Diego" }, { "id": "5", "name": "Gerardo Dorsey", "location": "New Jersey" } ]
Azure Stream Analytics
Start by creating an Azure Stream Analytics job. If you want to use the Azure Portal, just follow the steps outlined in this section or use the Azure CLI instead if you don't prefer clicking on a UI.
To configure Azure Event Hubs Input
Open the Azure Stream Analytics job you just created and configure Azure Event Hubs as an Input. Here are some screenshots which should guide you through the steps:
Choose Inputs from the menu on the left
Select + Add stream Input > Event Hub
Enter Event Hubs details — the portal provides you the convenience of choosing from existing Event Hub namespaces and respective Event Hub in your subscription, so all you need to do is choose the right one.
To configure Azure Blob Storage Input:
Choose Inputs from the menu on the left
Select Add reference input > Blob storage
Enter/choose Blob Storage details
Once you're done, you should see the following Inputs:
Configure the query and test with sample data
Azure Stream Analytics allows you to test your streaming queries with sample data. In this section, we’ll upload sample data for orders and customer information for the Event Hubs and Blob Storage inputs respectively.
Upload sample data for orders:
Open the Azure Stream Analytics job, select Query and upload sample orders data for Event Hub input
Save the JSON below to a file and upload it.
[ { "id": "42", "custid": "1", "amount": "100" }, { "id": "43", "custid": "2", "amount": "200" }, { "id": "44", "custid": "3", "amount": "300" }, { "id": "45", "custid": "3", "amount": "150" }, { "id": "46", "custid": "4", "amount": "170" }, { "id": "47", "custid": "5", "amount": "150" }, { "id": "48", "custid": "5", "amount": "200" } ]
Upload sample data for customers
Open the Azure Stream Analytics job, select Query and upload sample orders data for Blob storage input
You can upload the same JSON file that you uploaded to Blob Storage earlier.
Now, configure and run the below query:
SELECT o.id as order_id, o.amount as purchase, o.custid as customer_id, c.name customer_name, c.location as customer_location FROM orders o JOIN customers c ON o.custid = c.id
Open the Azure Stream Analytics job, select Query and follow the steps as depicted in the screenshot below:
Select Query > enter the query > Test query and don't forget to select Save query
The query JOINs
orders data from Event Hubs it with the static reference customers
data (from Blob storage) based on the matching customer ID (which is id
in the customers
data set and id
in the orders
stream.)
Explore reference data JOIN operations or dig into the Stream Analytics query reference
Test the Job with streaming data
It was nice to have the ability to use sample data for testing our streaming solution. Let's go ahead and try this end to end with actual data (orders) flowing into Event Hubs.
An Output
is required in order to run a Job
. In order to configure the Output, select Output > + Add > Event Hub
Enter Event Hubs details: the portal provides you the convenience of choosing from existing Event Hub namespaces and respective Event Hub in your subscription, so all you need to do is choose the right one.
Start the Job
In the Azure Stream Analytics interface, select Overview, click Start and confirm
Wait for the Job to start, you should see the Status
change to Running
Test the end to end flow
To keep things simple, we can use the kafkacat
CLI to produce orders and consume enriched events (instead of a program). Just install it and you should be good to go.
Note: Although I have used kafkacat
, you're free to choose any other mechanism (CLI or programmatic). This documentation provides lots of examples
Create a kafkacat.conf
file with Event Hubs info:
metadata.broker.list=<namespace>.servicebus.windows.net:9093 security.protocol=SASL_SSL sasl.mechanisms=PLAIN sasl.username=$ConnectionString sasl.password=Endpoint=sb:// <namespace>.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=
Start a consumer to listen from Event Hubs output topic
Let's first start the consumer process that will connect to the output topic (customer-orders
) which will get the enriched order information from Azure Stream Analytics
In a terminal:
export KAFKACAT_CONFIG=kafkacat.conf kafkacat -b <namespace>.servicebus.windows.net:9093 -t customer-orders //output % Reading configuration from file kafkacat.conf % Auto-selecting Consumer mode (use -P or -C to override)
This will block, waiting for records from customer-orders
.
In another terminal, start sending order info to the orders
topic
kafkacat -P -b <namespace>.servicebus.windows.net:9093 -t orders
You can send order data via stdout
. Simply paste these one at a time and observe the output in the other terminal:
{"id": "22","custid": "1","amount": "100"} {"id": "23","custid": "2","amount": "200"} {"id": "24","custid": "3","amount": "300"} {"id": "25","custid": "4","amount": "400"} {"id": "26","custid": "15","amount": "500"}
The output you see on the consumer terminal should be similar to this:
... % Reached end of topic customer-orders [0] at offset 0 {"order_id":"22","purchase":"100","customer_id":"11","customer_name":"Willis Collins","customer_location":"Dallas"} % Reached end of topic customer-orders [0] at offset 1 {"order_id":"23","purchase":"200","customer_id":"2","customer_name":"Casey Brady","customer_location":"Chicago" ...
Notice how the order info is now enriched with customer data (name, location in this case). You can use the information in this topic anyway you want. For example, you can persist this enriched materialized view to Azure Cosmos DB, trigger an Azure Function, etc.
As expected, you won’t see a corresponding enriched event corresponding to orders placed by customers whose ID isn’t present in the reference customer data (in Blob Storage), since the JOIN criteria is based on the customer ID.
This brings us to the end of this tutorial! I hope it helps you get started with Azure Stream Analytics and test the waters before moving on to more involved use cases.
Where to go next?
In addition to this, there’s plenty of material for you to dig in.
- Explore Architecture patterns
- Dive into reference solutions, such as Twitter sentiment analysis, fraud detection, or IoT data processing
- What are the common query patterns in Azure Stream Analytics
- How to use VS Code for local development
- Check out this GitHub repo with examples and sample data
Conclusion
High-velocity, real-time data poses challenges that are hard to deal with using traditional architectures — one such problem is joining these streams of data. Depending on the use case, a custom-built solution might serve you better, but this will take a lot of time and effort to get it right. If possible, you might want to think about extracting parts of your data processing architecture and offloading the heavy lifting to services which are tailor-made for such problems.
In this blog post, we explored a possible solution for implementing streaming joins using a combination of Azure Event Hubs for data ingestion and Azure Stream Analytics for data processing using SQL. These are powerful, off-the-shelf services that you are able to configure and use without setting up any infrastructure, and thanks to the cloud, the underlying complexity of the distributed systems involved in such solutions is completely abstracted from us.
Want to learn more about Azure certifications?
Check out our Azure Certifications and Learning Paths.