- Lab
- A Cloud Guru
Using Schema Registry in a Kafka Application
Confluent Schema Registry gives you the ability to serialize and deserialize complex data objects, as well as manage and enforce contracts between producers and consumers. In this hands-on lab, you will have the opportunity to work with the Confluent Schema Registry by building a full application that uses it. You will create a schema, and then you will build both a producer and a consumer that use the schema to serialize and deserialize data.
Path Info
Table of Contents
Clone the Starter Project and Run it to Make Sure Everything Is Working
- Clone the starter project into the
cd ~/ git clone https://github.com/linuxacademy/content-ccdak-schema-registry-lab.git
- Run the code to ensure it works before modifying it:
cd content-ccdak-schema-registry-lab/ ./gradlew runProducer ./gradlew runConsumer
Note: We should see a
Hello, world!
message in the output for both the producer and the consumer. - Clone the starter project into the
Implement the Producer and Consumer Using an Avro Schema.
- Create the directory for Avro schemas:
mkdir -p src/main/avro/com/linuxacademy/ccdak/schemaregistry
- Create a schema definition for purchases:
vi src/main/avro/com/linuxacademy/ccdak/schemaregistry/Purchase.avsc
{ "namespace": "com.linuxacademy.ccdak.schemaregistry", "type": "record", "name": "Purchase", "fields": [ {"name": "id", "type": "int"}, {"name": "product", "type": "string"}, {"name": "quantity", "type": "int"} ] }
- Implement the producer:
vi src/main/java/com/linuxacademy/ccdak/schemaregistry/ProducerMain.java
package com.linuxacademy.ccdak.schemaregistry; import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroSerializer; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; public class ProducerMain { public static void main(String[] args) { final Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); KafkaProducer<String, Purchase> producer = new KafkaProducer<String, Purchase>(props); Purchase apples = new Purchase(1, "apples", 17); producer.send(new ProducerRecord<String, Purchase>("inventory_purchases", apples.getId().toString(), apples)); Purchase oranges = new Purchase(2, "oranges", 5); producer.send(new ProducerRecord<String, Purchase>("inventory_purchases", oranges.getId().toString(), oranges)); producer.close(); } }
- Implement the consumer:
vi src/main/java/com/linuxacademy/ccdak/schemaregistry/ConsumerMain.java
package com.linuxacademy.ccdak.schemaregistry; import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; import java.time.Duration; import java.util.Collections; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; public class ConsumerMain { public static void main(String[] args) { final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); KafkaConsumer<String, Purchase> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("inventory_purchases")); try { BufferedWriter writer = new BufferedWriter(new FileWriter("/home/cloud_user/output/output.txt", true)); while (true) { final ConsumerRecords<String, Purchase> records = consumer.poll(Duration.ofMillis(100)); for (final ConsumerRecord<String, Purchase> record : records) { final String key = record.key(); final Purchase value = record.value(); String outputString = "key=" + key + ", value=" + value; System.out.println(outputString); writer.write(outputString + "\n"); } writer.flush(); } } catch (IOException e) { throw new RuntimeException(e); } } }
- Run the producer:
./gradlew runProducer
- Run the consumer:
./gradlew runConsumer
- Verify the data in the output file:
cat /home/cloud_user/output/output.txt
What's a lab?
Hands-on Labs are real environments created by industry experts to help you learn. These environments help you gain knowledge and experience, practice without compromising your system, test without risk, destroy without fear, and let you learn from your mistakes. Hands-on Labs: practice your skills before delivering in the real world.
Provided environment for hands-on practice
We will provide the credentials and environment necessary for you to practice right within your browser.
Guided walkthrough
Follow along with the author’s guided walkthrough and build something new in your provided environment!
Did you know?
On average, you retain 75% more of your learning if you get time for practice.