How to Create a GenAI Powered Real-Time Data Processing Solution
Build a GenAI-powered real-time data solution with Kafka and a fine-tuned LLM for sentiment analysis, complaint detection, and automated Slack notifications.
Oct 2, 2024 • 7 Minute Read
As businesses increasingly rely on real-time data processing, integrating Generative AI (GenAI) into these pipelines can provide valuable insights that help automate decision-making processes. In this blog, we’ll walk through building a real-time data processing solution that leverages a fine-tuned Large Language Model (LLM) for tasks like sentiment analysis and complaint detection. We’ll use Apache Kafka to handle real-time data ingestion and process store reviews as they come in. We’ll deploy the LLM directly in Python to analyze reviews and send Slack notifications when negative reviews or complaints are detected.
We’ll cover:
- Setting up Kafka for real-time data ingestion.
- Using the LLM for real-time sentiment analysis and complaint detection.
- Sending Slack notifications for negative reviews.
- Handling lag in Kafka with Python.
- Using Kafka Streams-like functionality with Streamz for advanced stream processing.
Let’s get started!
What is Kafka and Why Use It?
Apache Kafka is a distributed event-streaming platform designed to handle real-time data. It’s widely used for high-throughput and fault-tolerant messaging between systems, making it a popular choice for building real-time data pipelines. Kafka allows for seamless integration between data producers and consumers, and its topics are the perfect mechanism for real-time store reviews.
Key Kafka Concepts:
- Producer: Sends messages (reviews) to Kafka topics.
- Topic: A named stream of records (reviews in this case).
- Consumer: Reads messages from Kafka topics and processes them.
- Broker: A Kafka server that stores and delivers messages.
Setting Up Kafka
We’ll first set up Kafka locally and create a topic to store our real-time reviews. This allows you to simulate a stream of data that can later be processed by the LLM.
Step 1: Install and Start Kafka
Download Kafka from here and unzip it. Start Zookeeper, which Kafka uses for managing brokers:
# Start Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
Then, start Kafka itself:
# Start Kafka broker
bin/kafka-server-start.sh config/server.properties
Step 2: Create a Kafka Topic
We’ll create a topic called store-reviews to hold our incoming reviews:
# Create the topic 'store-reviews'
bin/kafka-topics.sh --create --topic store-reviews --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Step 3: Produce Reviews to Kafka
Let’s simulate real-time store reviews being sent to Kafka:
from confluent_kafka import Producer
# Create a Kafka producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})
# Produce some test reviews to Kafka
reviews = [
"The service was great, but the product quality was lacking.",
"Terrible customer support, I waited for hours!",
"Amazing experience, highly recommend this store.",
"The item broke within a week, very disappointed."
]
# Send each review to Kafka
for review in reviews:
producer.produce('store-reviews', review.encode('utf-8'))
producer.flush()
Each review is sent to the store-reviews topic in Kafka, simulating real-time data ingestion. Now, we’ll consume these reviews and process them using our fine-tuned LLM.
Integrating TensorFlow with Kafka for Real-Time Processing
TensorFlow Serving Overview
To perform sentiment analysis and complaint detection, we’ll use a fine-tuned LLM in TensorFlow. Instead of calling an external API (such as TensorFlow Serving), we’ll load the model directly into the Kafka consumer for efficient processing.
Step 1: Load the Fine-Tuned LLM in Python
Assuming you’ve already fine-tuned your LLM and saved it in SavedModel format, load it using TensorFlow:
import tensorflow as tf
# Load the fine-tuned LLM from the saved model directory
model = tf.saved_model.load("/path_to_saved_model")
infer = model.signatures["serving_default"]
This loads your LLM, making it ready to analyze incoming reviews.
Step 2: Set Up the Kafka Consumer
Now, we’ll write a Kafka consumer that listens for new reviews, analyzes the sentiment using the LLM, and checks for common complaints.
from confluent_kafka import Consumer
import json
import tensorflow as tf
# Configure Kafka consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'llm-consumer-group',
'auto.offset.reset': 'earliest'
})
# Subscribe to the 'store-reviews' topic
consumer.subscribe(['store-reviews'])
# Function to analyze sentiment using the LLM
def analyze_sentiment(review_text):
input_tensor = tf.constant([review_text]) # Convert the input to a tensor
result = infer(input_tensor) # Run inference
sentiment_score = result['predictions'].numpy()[0][0] # Get sentiment score
return sentiment_score
# Function to detect complaints (simple rule-based check)
def detect_complaint(review_text):
complaint_keywords = ["bad", "terrible", "poor", "disappointed", "broke"]
return any(keyword in review_text.lower() for keyword in complaint_keywords)
# Start consuming reviews from Kafka
while True:
msg = consumer.poll(1.0) # Poll Kafka for new messages
if msg is None:
continue
review = msg.value().decode('utf-8') # Decode the message
print(f"Received review: {review}")
# Analyze sentiment using the LLM
sentiment_score = analyze_sentiment(review)
print(f"Sentiment score: {sentiment_score}")
# Check if the sentiment is negative or a complaint is detected
if sentiment_score < 0.5 or detect_complaint(review):
print("Negative sentiment or complaint detected!")
send_slack_message(review) # Trigger Slack notification
This Kafka consumer listens for reviews, processes each one using the LLM for sentiment analysis, and detects common complaints based on simple keyword matching.
Sending Slack Notifications
Once the LLM detects negative sentiment or complaints, we’ll send a Slack notification to alert the relevant team.
Step 1: Set Up a Slack Webhook
In Slack, go to Apps & Integrations and create an Incoming Webhook. You’ll get a Webhook URL that you’ll use to send messages to a Slack channel.
Step 2: Sending Slack Messages in Python
Here’s how to trigger a Slack message whenever a negative review or complaint is detected:
from slack_sdk import WebClient
# Initialize the Slack client
slack_client = WebClient(token='your-slack-api-token')
# Function to send a Slack message
def send_slack_message(review):
slack_client.chat_postMessage(channel='#your-channel', text=f"Negative review: {review}")
print("Slack message sent!")
This sends a message to a Slack channel, ensuring that your team gets notified when there’s a problematic review.
Handling Lag in Kafka
Real-time pipelines need to handle high data throughput efficiently. Kafka’s consumer groups allow you to scale horizontally by adding more consumers to share the load. Additionally, batch processing can help handle lag when messages are coming in faster than they can be processed.
Batch Processing Example
batch_size = 10
batch = []
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
batch.append(msg.value().decode('utf-8'))
if len(batch) >= batch_size:
# Process the entire batch at once
for review in batch:
sentiment_score = analyze_sentiment(review)
if sentiment_score < 0.5 or detect_complaint(review):
send_slack_message(review)
batch.clear() # Clear the batch after processing
In this example, we wait until we’ve accumulated 10 reviews, process them in a batch, and then clear the batch to handle the next set of messages.
Advanced Use Case: Using Kafka Streams with Python
While Kafka Streams is a Java-based library, we can achieve similar functionality in Python using Streamz. This allows us to build more complex, stream-based pipelines directly in Python.
Step 1: Install Streamz
pip install streamz
Step 2: Using Streamz for Stream Processing
Here’s how you can use Streamz to build a real-time stream processing pipeline that integrates Kafka and the LLM:
from streamz import Stream
from confluent_kafka import Consumer
# Set up Kafka consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'streamz-consumer-group',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['store-reviews'])
# Create a Streamz stream to process Kafka messages
source = Stream()
# Kafka message ingestion function
def read_kafka_message():
msg = consumer.poll(1.0)
if msg is not None:
review = msg.value().decode('utf-8')
return review
return None
# Define the stream pipeline
source.map(lambda x: read_kafka_message()) \
.filter(lambda review: review is not None) \
.map(analyze_sentiment) \
.sink(lambda score: print(f"Sentiment score: {score}"))
# Start the stream
source.emit(None)
This creates a continuous stream where reviews are consumed from Kafka, processed by the LLM, and results are printed or further processed. You can add additional steps like triggering Slack notifications for negative reviews or storing results in a database.
Conclusion
In this guide, we’ve built a real-time data processing solution using Python, Kafka, and a fine-tuned LLM. We demonstrated how to process real-time store reviews, perform sentiment analysis, detect complaints, and send Slack notifications for negative feedback. We also explored handling lag and building more complex stream-processing pipelines using Streamz for Kafka Streams-like functionality.
This real-time pipeline is scalable and adaptable to other use cases such as fraud detection, anomaly detection, or customer feedback management. With Kafka for data ingestion and a LLM for intelligent processing, you can build powerful real-time solutions powered by GenAI.