spring.kafka.consumer.key-deserializer specifies the deserializer class for keys.
spring.kafka.consumer.value-deserializer specifies the deserializer class for values.
spring.kafka.consumer.properties.spring.json.trusted.packages specifies the comma-delimited list of package patterns allowed for deserialization. '*' means deserializing all the packages.
spring.kafka.producer.key-deserializer specifies the serializer class for keys.
spring.kafka.producer.value-deserializer specifies the serializer class for values.
3. Model class
We have created User class, which we will send to Kafka. Its instance will be serialized by JsonSerializer to byte array. Kafka finally stores this byte array in the given partition.
During deserialization, JsonDeserializer is used to for receiving JSON from Kafka as byte array and return User object to the application.
The producer API simply consumes the user information in an HTTP POST API. It then creates a new User object and send to Kafka using KafkaTemplate.
KafkaProducerController.java
@PostMapping(value ="/createUser")publicvoidsendMessageToKafkaTopic(@RequestParam("userId")long userId,@RequestParam("firstName")String firstName,@RequestParam("lastName")String lastName){User user =newUser();
user.setUserId(userId);
user.setFirstName(firstName);
user.setLastName(lastName);this.producerService.saveCreateUserLog(user);}
KafKaProducerService.java
@AutowiredprivateKafkaTemplate<String,Object> kafkaTemplate;publicvoidsaveCreateUserLog(User user){
logger.info(String.format("User created -> %s", user));this.kafkaTemplate.send(AppConstants.TOPIC_NAME_USER_LOG, user);}
5. Kafka Consumer
The consumer is implemented as @KafkaListener which gets notified everytime a new entry is added to the topic.
@KafkaListener(topics =AppConstants.TOPIC_NAME_USER_LOG,
groupId =AppConstants.GROUP_ID)publicvoidconsume(User user){
logger.info(String.format("User created -> %s", user));}
6. Demo
Use any REST API tester and post a few messages to API http://localhost:9000/kafka/createUser as below.
Message post : http://localhost:9000/kafka/createUser?userId=1&firstName=Lokesh&lastName=Gupta
Observe the console logs:
2020-05-2423:36:47.132INFO2092---[nio-9000-exec-4]2020-05-2601:03:52.722INFO11924---[nio-9000-exec-6] c.h.k.demo.service.KafKaProducerService
: User created -> User [userId=1, firstName=Lokesh, lastName=Gupta]2020-05-2601:03:52.729INFO11924---[ntainer#1-0-C-1] c.h.k.demo.service.KafKaConsumerService
: User created -> User [userId=1, firstName=Lokesh, lastName=Gupta]
7. Conclusion
In this Spring boot Kafka JsonSerializer example, we learned to use JsonSerializer to serialize and deserialize the Java objects and store in Kafka.
A fun-loving family man, passionate about computers and problem-solving, with over 15 years of experience in Java and related technologies.
An avid Sci-Fi movie enthusiast and a fan of Christopher Nolan and Quentin Tarantino.
Comments