Learn to configure multiple consumers listening to different Kafka topics in spring boot application using Java-based bean configurations.

1. Objective

In this Kafka tutorial, we will learn the following:

  • Configuring Kafka into Spring boot
  • Using Java configuration for Kafka
  • Configuring multiple Kafka consumers and producers
  • Configuring each consumer to listen to a separate topic
  • Configuring each producer to publish to a separate topic
  • Sending string (StringSerializer) as well as custom objects (JsonSerializer) as payloads

2. Dependencies

The project is a maven project and includes the following dependencies:

  • spring-boot-starter-web: for creating rest APIs or the user interface.
  • spring-kafka: contains spring classes, interfaces and annotations for interacting with kafka broker and other messaging functionalities.
  <relativePath /> <!-- lookup parent from repository -->

3. Kafka Topics Configuration

We are creating two topics i.e. test-log and user-log.

  1. test-log : is used for publishing simple string messages.
  2. user-log : is used for publishing serialized User object.

Let’s note down a few crucial points.

  • Spring Kafka will automatically add topics for all beans of type NewTopic.
  • Using TopicBuilder, We can create new topics as well as refer to existing topics in Kafka.KafkaAdmin
  • Apart from topic name, we can specify the number of partitions and the number of replicas for the topic.
  • By default, it uses default values of the partition and the replication factor as 1.
  • If you are not using Spring boot then make sure to create KafkaAdmin bean as well. Spring boot, creates it for us.

Make sure to verify the number of partitions given in any Kafka topic. If the number of the partitions given is greater than the existing number of partitions in Kafka broker, the new number will be applied, and more partitions will be added.

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;
public class TopicConfig 
  @Value(value = "${kafka.bootstrapAddress}")
  private String bootstrapAddress;
  @Value(value = "${general.topic.name}")
  private String topicName;
  @Value(value = "${user.topic.name}")
  private String userTopicName;
  public NewTopic generalTopic() {
    return TopicBuilder.name(topicName)
  public NewTopic userTopic() {
    return TopicBuilder.name(userTopicName)
  //If not using spring boot
    public KafkaAdmin kafkaAdmin() 
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);

4. Message Consumers Configuration

We are creating two consumers who will listen to two topics we created in the 3rd section (topic configuration).

The Kafka multiple consumer configuration involves the following classes:

  • DefaultKafkaConsumerFactory : is used to create new Consumer instances where all consumers share common configuration properties mentioned in this bean.
  • ConcurrentKafkaListenerContainerFactory : is used to build ConcurrentMessageListenerContainer. This factory is primarily for building containers for @KafkaListener annotated methods.
  • ConsumerConfig : holds the consumer configuration keys.
  • @KafkaListener : marks a method to be the target of a Kafka message listener on the specified topics.
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import com.howtodoinjava.kafka.demo.model.User;
public class KafkaConsumerConfig 
  @Value(value = "${kafka.bootstrapAddress}")
  private String bootstrapAddress;
  @Value(value = "${general.topic.group.id}")
  private String groupId;
  @Value(value = "${user.topic.group.id}")
  private String userGroupId;
  // 1. Consume string data from Kafka
  public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
    return new DefaultKafkaConsumerFactory<>(props);
  public ConcurrentKafkaListenerContainerFactory<String, String> 
            kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory 
      = new ConcurrentKafkaListenerContainerFactory<>();
    return factory;
  // 2. Consume user objects from Kafka
  public ConsumerFactory<String, User> userConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, userGroupId);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
    return new DefaultKafkaConsumerFactory<>(props, 
        new StringDeserializer(), 
        new JsonDeserializer<>(User.class));
  public ConcurrentKafkaListenerContainerFactory<String, User> 
            userKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, User> factory 
      = new ConcurrentKafkaListenerContainerFactory<>();
    return factory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import com.howtodoinjava.kafka.demo.model.User;
public class KafKaConsumerService 
  private final Logger logger 
    = LoggerFactory.getLogger(KafKaConsumerService.class);
  @KafkaListener(topics = "${general.topic.name}", 
      groupId = "${general.topic.group.id}")
  public void consume(String message) {
    logger.info(String.format("Message recieved -> %s", message));
  @KafkaListener(topics = "${user.topic.name}", 
      groupId = "${user.topic.group.id}",
      containerFactory = "userKafkaListenerContainerFactory")
  public void consume(User user) {
    logger.info(String.format("User created -> %s", user));

At last, do not forget to enable Kafka listener annotated endpoints (@KafkaListener) using annotation @EnableKafka.

5. Message Producers Configuration

We are creating two producers that will be producing and sending messages to two different topics we created in the 3rd section (topic configuration).

The Kafka multiple producers configuration involve following classes:

  • DefaultKafkaProducerFactory : is used to create singleton Producer instances for the provided config options.
  • KafkaTemplate : is used for executing send message operations in all supported ways.
  • ProducerConfig : holds the producer configuration keys.
  • ListenableFuture : is used to accept completion callbacks after sending messages.
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.howtodoinjava.kafka.demo.model.User;
public class KafkaProducerConfig 
  @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;
  //1. Send string to Kafka
  public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      return new DefaultKafkaProducerFactory<>(props);
  public KafkaTemplate<String, String> kafkaTemplate() {
      return new KafkaTemplate<>(producerFactory());
  //2. Send User objects to Kafka
    public ProducerFactory<String, User> userProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    public KafkaTemplate<String, User> userKafkaTemplate() {
        return new KafkaTemplate<>(userProducerFactory());
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import com.howtodoinjava.kafka.demo.model.User;
public class KafKaProducerService 
  private static final Logger logger = 
  //1. General topic with a string payload
  @Value(value = "${general.topic.name}")
    private String topicName;
    private KafkaTemplate<String, String> kafkaTemplate;
  //2. Topic with user object payload
    @Value(value = "${user.topic.name}")
    private String userTopicName;
    private KafkaTemplate<String, User> userKafkaTemplate;
  public void sendMessage(String message) 
    ListenableFuture<SendResult<String, String>> future 
      = this.kafkaTemplate.send(topicName, message);
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            public void onSuccess(SendResult<String, String> result) {
              logger.info("Sent message: " + message 
                  + " with offset: " + result.getRecordMetadata().offset());
            public void onFailure(Throwable ex) {
              logger.error("Unable to send message : " + message, ex);
  public void saveCreateUserLog(User user) 
    ListenableFuture<SendResult<String, User>> future 
      = this.userKafkaTemplate.send(userTopicName, user);
    future.addCallback(new ListenableFutureCallback<SendResult<String, User>>() {
            public void onSuccess(SendResult<String, User> result) {
              logger.info("User created: "
                  + user + " with offset: " + result.getRecordMetadata().offset());
            public void onFailure(Throwable ex) {
              logger.error("User created : " + user, ex);

6. Conclusion

In this spring Kafka multiple consumer java configuration example, we learned to creates multiple topics using TopicBuilder API. Then we configured one consumer and one producer per created topic.

To run the above code, please follow the REST API endpoints created in Kafka JsonSerializer Example.

Happy Learning !!

Sourcecode on Github


