HowToDoInJava

  • Python
  • Java
  • Spring Boot
  • Dark Mode
Home / Kafka / Spring Boot Kafka JsonSerializer Example

Spring Boot Kafka JsonSerializer Example

Learn to use JsonSerializer and JsonDeserializer classes for storing and retrieving JSON from Apache Kafka topics and return Java model objects.

1. Prerequisites

  • Please follow this guide to setup Kafka on your machine.
  • We are modifying the Spring boot and Kafka hello world application.
  • Also make sure that your machine should have minimum Java 8 and Maven installed.

2. Application Configuration

In application.properties file, we have added following configuration.

server.port=9000

spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  • spring.kafka.consumer.key-deserializer specifies the deserializer class for keys.
  • spring.kafka.consumer.value-deserializer specifies the deserializer class for values.
  • spring.kafka.consumer.properties.spring.json.trusted.packages specifies comma-delimited list of package patterns allowed for deserialization. '*' means deserialize all packages.
  • spring.kafka.producer.key-deserializer specifies the serializer class for keys.
  • spring.kafka.producer.value-deserializer specifies the serializer class for values.

3. Model class

We have created User class, which we will send to Kafka. Its instance will be serialized by JsonSerializer to byte array. Kafka finally stores this byte array into the given partition.

During deserialization, JsonDeserializer is used to for receiving JSON from Kafka as byte array and return User object to application.

public class User 
{
	private long userId;
    private String firstName;
    private String lastName;
    
	public long getUserId() {
		return userId;
	}
	public void setUserId(long userId) {
		this.userId = userId;
	}
	public String getFirstName() {
		return firstName;
	}
	public void setFirstName(String firstName) {
		this.firstName = firstName;
	}
	public String getLastName() {
		return lastName;
	}
	public void setLastName(String lastName) {
		this.lastName = lastName;
	}
	
	@Override
	public String toString() {
		return "User [userId=" + userId + ", firstName=" 
		        + firstName + ", lastName=" + lastName + "]";
	}
}

4. Kafka Producer

The producer API simply consumers the user information in a HTTP POST API. It then creates a new User object and send to Kafka using KafkaTemplate.

@PostMapping(value = "/createUser")
public void sendMessageToKafkaTopic(
		@RequestParam("userId") long userId, 
		@RequestParam("firstName") String firstName,
		@RequestParam("lastName") String lastName) {
	
	User user = new User();
	user.setUserId(userId);
	user.setFirstName(firstName);
	user.setLastName(lastName);
	
	this.producerService.saveCreateUserLog(user);
}
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

public void saveCreateUserLog(User user) 
{
	logger.info(String.format("User created -> %s", user));
	this.kafkaTemplate.send(AppConstants.TOPIC_NAME_USER_LOG, user);
}

5. Kafka Consumer

The consumer is implemented as @KafkaListener which gets notified everytime a new entry is added in topic.

@KafkaListener(topics = AppConstants.TOPIC_NAME_USER_LOG, 
				groupId = AppConstants.GROUP_ID)
public void consume(User user) 
{
	logger.info(String.format("User created -> %s", user));
}

6. Test

Use any REST API tester and post few messages to API http://localhost:9000/kafka/createUser as below.

Message post : http://localhost:9000/kafka/createUser?userId=1&firstName=Lokesh&lastName=Gupta

Observe the console logs:

2020-05-24 23:36:47.132  INFO 2092 --- [nio-9000-exec-4] 
2020-05-26 01:03:52.722  INFO 11924 --- [nio-9000-exec-6] c.h.k.demo.service.KafKaProducerService  
: User created -> User [userId=1, firstName=Lokesh, lastName=Gupta]

2020-05-26 01:03:52.729  INFO 11924 --- [ntainer#1-0-C-1] c.h.k.demo.service.KafKaConsumerService  
: User created -> User [userId=1, firstName=Lokesh, lastName=Gupta]

7. Conclusion

In this spring boot kafka JsonSerializer example, we learned to use JsonSerializer to serialize and deserialize the Java objects and store in Kafka.

Happy Learning !!

Sourcecode Download

Was this post helpful?

Let us know if you liked the post. That’s the only way we can improve.
TwitterFacebookLinkedInRedditPocket

About Lokesh Gupta

A family guy with fun loving nature. Love computers, programming and solving everyday problems. Find me on Facebook and Twitter.

Comments are closed on this article!

Search Tutorials

Apache Kafka Tutorial

  • Kafka – Introduction
  • Kafka – Getting Started
  • Kafka – Spring Boot
  • Kafka – SB JsonSerializer
  • Kafka – Multiple Consumers

Meta Links

  • About Me
  • Contact Us
  • Privacy policy
  • Advertise
  • Guest and Sponsored Posts

Recommended Reading

  • 10 Life Lessons
  • Secure Hash Algorithms
  • How Web Servers work?
  • How Java I/O Works Internally?
  • Best Way to Learn Java
  • Java Best Practices Guide
  • Microservices Tutorial
  • REST API Tutorial
  • How to Start New Blog

Copyright © 2020 · HowToDoInjava.com · All Rights Reserved. | Sitemap

  • Sealed Classes and Interfaces