Learn to use JsonSerializer and JsonDeserializer
classes for storing and retrieving JSON from Apache Kafka topics and returning Java model objects.
1. Prerequisites
- Please follow this guide to setup Kafka on your machine.
- We are modifying the Spring boot and Kafka hello world application.
- Also, ensure that your machine has minimum Java 8 and Maven installed.
2. Application Configuration
In application.properties
file, we have added the following configuration.
server.port=9000
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.key-deserializer
specifies the deserializer class for keys.spring.kafka.consumer.value-deserializer
specifies the deserializer class for values.spring.kafka.consumer.properties.spring.json.trusted.packages
specifies the comma-delimited list of package patterns allowed for deserialization.'*'
means deserializing all the packages.spring.kafka.producer.key-deserializer
specifies the serializer class for keys.spring.kafka.producer.value-deserializer
specifies the serializer class for values.
3. Model class
We have created User
class, which we will send to Kafka. Its instance will be serialized by JsonSerializer
to byte array. Kafka finally stores this byte array in the given partition.
During deserialization, JsonDeserializer
is used to for receiving JSON from Kafka as byte array and return User
object to the application.
public class User
{
private long userId;
private String firstName;
private String lastName;
public long getUserId() {
return userId;
}
public void setUserId(long userId) {
this.userId = userId;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
@Override
public String toString() {
return "User [userId=" + userId + ", firstName="
+ firstName + ", lastName=" + lastName + "]";
}
}
4. Kafka Producer
The producer API simply consumes the user information in an HTTP POST API. It then creates a new User
object and send to Kafka using KafkaTemplate
.
@PostMapping(value = "/createUser")
public void sendMessageToKafkaTopic(
@RequestParam("userId") long userId,
@RequestParam("firstName") String firstName,
@RequestParam("lastName") String lastName) {
User user = new User();
user.setUserId(userId);
user.setFirstName(firstName);
user.setLastName(lastName);
this.producerService.saveCreateUserLog(user);
}
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void saveCreateUserLog(User user)
{
logger.info(String.format("User created -> %s", user));
this.kafkaTemplate.send(AppConstants.TOPIC_NAME_USER_LOG, user);
}
5. Kafka Consumer
The consumer is implemented as @KafkaListener
which gets notified everytime a new entry is added to the topic.
@KafkaListener(topics = AppConstants.TOPIC_NAME_USER_LOG,
groupId = AppConstants.GROUP_ID)
public void consume(User user)
{
logger.info(String.format("User created -> %s", user));
}
6. Demo
Use any REST API tester and post a few messages to API http://localhost:9000/kafka/createUser
as below.
Message post : http://localhost:9000/kafka/createUser?userId=1&firstName=Lokesh&lastName=Gupta
Observe the console logs:
2020-05-24 23:36:47.132 INFO 2092 --- [nio-9000-exec-4]
2020-05-26 01:03:52.722 INFO 11924 --- [nio-9000-exec-6] c.h.k.demo.service.KafKaProducerService
: User created -> User [userId=1, firstName=Lokesh, lastName=Gupta]
2020-05-26 01:03:52.729 INFO 11924 --- [ntainer#1-0-C-1] c.h.k.demo.service.KafKaConsumerService
: User created -> User [userId=1, firstName=Lokesh, lastName=Gupta]
7. Conclusion
In this Spring boot Kafka JsonSerializer example, we learned to use JsonSerializer
to serialize and deserialize the Java objects and store in Kafka.
Happy Learning !!
Leave a Reply