- Lab
- A Cloud Guru
Working with Stream Processing in Kafka
Kafka streams provide the ability to perform powerful data processing operations against Kafka data in real-time. In this lab, we will have the opportunity to work with Kafka streams by building a Java application capable of transforming data about individual purchases into a running total of each item that was bought. In addition to general stream processing, this lab will provide hands-on experience with topics such as stateless and stateful transformations, data aggregation, and type conversion.
Path Info
Table of Contents
-
Challenge
Clone the Starter Project from GitHub and Perform a Test Run
- Clone the starter project from GitHub:
cd ~/ git clone https://github.com/linuxacademy/content-ccdak-kafka-streams-lab.git
- Perform a test to ensure that the code can compile and run:
cd content-ccdak-kafka-streams-lab/ ./gradlew run
The output should contain the message printed by the
Main
class:Hello, world!
-
Challenge
Implement and Run the Kafka Streams Application
- Access the
build.gradle
file that must be edited:
vi build.gradle
- Add the following dependencies to the
dependencies {...}
section of the file:
implementation 'org.apache.kafka:kafka-streams:2.2.1' implementation 'org.apache.kafka:kafka-clients:2.2.1' ...
- Implement our streams logic in
Main.java
:
vi src/main/java/com/linuxacademy/ccdak/streams/Main.java
Note: Remember to check the code comments for some helpful info, or the solution video for a more thorough explanation.
Here is an example of
Main.java
:package com.linuxacademy.ccdak.streams; import java.util.Properties; import java.util.concurrent.CountDownLatch; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; public class Main { public static void main(String[] args) { // Set up the configuration. final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "inventory-data"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); // Since the input topic uses Strings for both key and value, set the default Serdes to String. props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); // Get the source stream. final StreamsBuilder builder = new StreamsBuilder(); final KStream<String, String> source = builder.stream("inventory_purchases"); // Convert the value from String to Integer. If the value is not a properly-formatted number, print a message and set it to 0. final KStream<String, Integer> integerValuesSource = source.mapValues(value -> { try { return Integer.valueOf(value); } catch (NumberFormatException e) { System.out.println("Unable to convert to Integer: \"" + value + "\""); return 0; } }); // Group by the key and reduce to provide a total quantity for each key. final KTable<String, Integer> productCounts = integerValuesSource .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer())) .reduce((total, newQuantity) -> total + newQuantity); // Output to the output topic. productCounts .toStream() .to("total_purchases", Produced.with(Serdes.String(), Serdes.Integer())); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); // Print the topology to the console. System.out.println(topology.describe()); final CountDownLatch latch = new CountDownLatch(1); // Attach a shutdown handler to catch control-c and terminate the application gracefully. Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (final Throwable e) { System.out.println(e.getMessage()); System.exit(1); } System.exit(0); } }
- Run the code:
./gradlew run
- We can check the output of the application by consuming from the output topic:
kafka-console-consumer --bootstrap-server localhost:9092 --topic total_purchases --from-beginning --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
- If we want to visually compare the output with the input, in a new terminal window, we can consume from the input topic with:
kafka-console-consumer --bootstrap-server localhost:9092 --topic inventory_purchases --property print.key=true
- Access the
What's a lab?
Hands-on Labs are real environments created by industry experts to help you learn. These environments help you gain knowledge and experience, practice without compromising your system, test without risk, destroy without fear, and let you learn from your mistakes. Hands-on Labs: practice your skills before delivering in the real world.
Provided environment for hands-on practice
We will provide the credentials and environment necessary for you to practice right within your browser.
Guided walkthrough
Follow along with the author’s guided walkthrough and build something new in your provided environment!
Did you know?
On average, you retain 75% more of your learning if you get time for practice.