Learn to configure multiple consumers listening to different Kafka topics in spring boot application using Java-based bean configurations.
1. Objective
In this Kafka tutorial, we will learn the following:
- Configuring Kafka into Spring boot
- Using Java configuration for Kafka
- Configuring multiple Kafka consumers and producers
- Configuring each consumer to listen to a separate topic
- Configuring each producer to publish to a separate topic
- Sending string (
StringSerializer
) as well as custom objects (JsonSerializer
) as payloads
2. Dependencies
The project is a maven project and includes the following dependencies:
- spring-boot-starter-web: for creating rest APIs or the user interface.
- spring-kafka: contains spring classes, interfaces and annotations for interacting with kafka broker and other messaging functionalities.
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
3. Kafka Topics Configuration
We are creating two topics i.e. test-log
and user-log
.
test-log
: is used for publishing simple string messages.user-log
: is used for publishing serializedUser
object.
Let’s note down a few crucial points.
- Spring Kafka will automatically add topics for all beans of type
NewTopic
. - Using
TopicBuilder
, We can create new topics as well as refer to existing topics in Kafka.KafkaAdmin - Apart from
topic name
, we can specify the number of partitions and the number of replicas for the topic. - By default, it uses default values of the partition and the replication factor as
1
. - If you are not using Spring boot then make sure to create KafkaAdmin bean as well. Spring boot, creates it for us.
Make sure to verify the number of partitions given in any Kafka topic. If the number of the partitions given is greater than the existing number of partitions in Kafka broker, the new number will be applied, and more partitions will be added.
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;
@Configuration
public class TopicConfig
{
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value(value = "${general.topic.name}")
private String topicName;
@Value(value = "${user.topic.name}")
private String userTopicName;
@Bean
public NewTopic generalTopic() {
return TopicBuilder.name(topicName)
.partitions(1)
.replicas(1)
.build();
}
@Bean
public NewTopic userTopic() {
return TopicBuilder.name(userTopicName)
.partitions(1)
.replicas(1)
.build();
}
//If not using spring boot
@Bean
public KafkaAdmin kafkaAdmin()
{
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
}
4. Message Consumers Configuration
We are creating two consumers who will listen to two topics we created in the 3rd section (topic configuration).
The Kafka multiple consumer configuration involves the following classes:
- DefaultKafkaConsumerFactory : is used to create new
Consumer
instances where all consumers share common configuration properties mentioned in this bean. - ConcurrentKafkaListenerContainerFactory : is used to build
ConcurrentMessageListenerContainer
. This factory is primarily for building containers for@KafkaListener
annotated methods. - ConsumerConfig : holds the consumer configuration keys.
- @KafkaListener : marks a method to be the target of a Kafka message listener on the specified topics.
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import com.howtodoinjava.kafka.demo.model.User;
@Configuration
public class KafkaConsumerConfig
{
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value(value = "${general.topic.group.id}")
private String groupId;
@Value(value = "${user.topic.group.id}")
private String userGroupId;
// 1. Consume string data from Kafka
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
// 2. Consume user objects from Kafka
public ConsumerFactory<String, User> userConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, userGroupId);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(),
new JsonDeserializer<>(User.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User>
userKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, User> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(userConsumerFactory());
return factory;
}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import com.howtodoinjava.kafka.demo.model.User;
@Service
public class KafKaConsumerService
{
private final Logger logger
= LoggerFactory.getLogger(KafKaConsumerService.class);
@KafkaListener(topics = "${general.topic.name}",
groupId = "${general.topic.group.id}")
public void consume(String message) {
logger.info(String.format("Message recieved -> %s", message));
}
@KafkaListener(topics = "${user.topic.name}",
groupId = "${user.topic.group.id}",
containerFactory = "userKafkaListenerContainerFactory")
public void consume(User user) {
logger.info(String.format("User created -> %s", user));
}
}
At last, do not forget to enable Kafka listener annotated endpoints (
@KafkaListener
) using annotation @EnableKafka.
5. Message Producers Configuration
We are creating two producers that will be producing and sending messages to two different topics we created in the 3rd section (topic configuration).
The Kafka multiple producers configuration involve following classes:
- DefaultKafkaProducerFactory : is used to create singleton
Producer
instances for the provided config options. - KafkaTemplate : is used for executing send message operations in all supported ways.
- ProducerConfig : holds the producer configuration keys.
- ListenableFuture : is used to accept completion callbacks after sending messages.
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.howtodoinjava.kafka.demo.model.User;
@Configuration
public class KafkaProducerConfig
{
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
//1. Send string to Kafka
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
//2. Send User objects to Kafka
@Bean
public ProducerFactory<String, User> userProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, User> userKafkaTemplate() {
return new KafkaTemplate<>(userProducerFactory());
}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import com.howtodoinjava.kafka.demo.model.User;
@Service
public class KafKaProducerService
{
private static final Logger logger =
LoggerFactory.getLogger(KafKaProducerService.class);
//1. General topic with a string payload
@Value(value = "${general.topic.name}")
private String topicName;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
//2. Topic with user object payload
@Value(value = "${user.topic.name}")
private String userTopicName;
@Autowired
private KafkaTemplate<String, User> userKafkaTemplate;
public void sendMessage(String message)
{
ListenableFuture<SendResult<String, String>> future
= this.kafkaTemplate.send(topicName, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
logger.info("Sent message: " + message
+ " with offset: " + result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
logger.error("Unable to send message : " + message, ex);
}
});
}
public void saveCreateUserLog(User user)
{
ListenableFuture<SendResult<String, User>> future
= this.userKafkaTemplate.send(userTopicName, user);
future.addCallback(new ListenableFutureCallback<SendResult<String, User>>() {
@Override
public void onSuccess(SendResult<String, User> result) {
logger.info("User created: "
+ user + " with offset: " + result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
logger.error("User created : " + user, ex);
}
});
}
}
6. Conclusion
In this spring Kafka multiple consumer java configuration example, we learned to creates multiple topics using TopicBuilder
API. Then we configured one consumer and one producer per created topic.
To run the above code, please follow the REST API endpoints created in Kafka JsonSerializer Example.
Happy Learning !!
Leave a Reply