Spring Boot Kafka Multiple Consumers Example

Learn to configure multiple consumers listening to different Kafka topics in spring boot application using Java-based bean configurations.

1. Objective

In this Kafka tutorial, we will learn the following:

  • Configuring Kafka into Spring boot
  • Using Java configuration for Kafka
  • Configuring multiple Kafka consumers and producers
  • Configuring each consumer to listen to a separate topic
  • Configuring each producer to publish to a separate topic
  • Sending string (StringSerializer) as well as custom objects (JsonSerializer) as payloads

2. Dependencies

The project is a maven project and includes the following dependencies:

  • spring-boot-starter-web: for creating rest APIs or the user interface.
  • spring-kafka: contains spring classes, interfaces and annotations for interacting with kafka broker and other messaging functionalities.
<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.3.0.RELEASE</version>
  <relativePath /> <!-- lookup parent from repository -->
</parent>
 
<properties>
  <java.version>1.8</java.version>
</properties>
 
<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
  </dependency>
</dependencies>

3. Kafka Topics Configuration

We are creating two topics i.e. test-log and user-log.

  1. test-log : is used for publishing simple string messages.
  2. user-log : is used for publishing serialized User object.

Let’s note down a few crucial points.

  • Spring Kafka will automatically add topics for all beans of type NewTopic.
  • Using TopicBuilder, We can create new topics as well as refer to existing topics in Kafka.KafkaAdmin
  • Apart from topic name, we can specify the number of partitions and the number of replicas for the topic.
  • By default, it uses default values of the partition and the replication factor as 1.
  • If you are not using Spring boot then make sure to create KafkaAdmin bean as well. Spring boot, creates it for us.

Make sure to verify the number of partitions given in any Kafka topic. If the number of the partitions given is greater than the existing number of partitions in Kafka broker, the new number will be applied, and more partitions will be added.

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;
 
@Configuration
public class TopicConfig 
{
  @Value(value = "${kafka.bootstrapAddress}")
  private String bootstrapAddress;
 
  @Value(value = "${general.topic.name}")
  private String topicName;
 
  @Value(value = "${user.topic.name}")
  private String userTopicName;
 
  @Bean
  public NewTopic generalTopic() {
    return TopicBuilder.name(topicName)
            .partitions(1)
            .replicas(1)
            .build();
  }
 
  @Bean
  public NewTopic userTopic() {
    return TopicBuilder.name(userTopicName)
            .partitions(1)
            .replicas(1)
            .build();
  }
   
  //If not using spring boot
   
  @Bean
    public KafkaAdmin kafkaAdmin() 
  {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }
}

4. Message Consumers Configuration

We are creating two consumers who will listen to two topics we created in the 3rd section (topic configuration).

The Kafka multiple consumer configuration involves the following classes:

  • DefaultKafkaConsumerFactory : is used to create new Consumer instances where all consumers share common configuration properties mentioned in this bean.
  • ConcurrentKafkaListenerContainerFactory : is used to build ConcurrentMessageListenerContainer. This factory is primarily for building containers for @KafkaListener annotated methods.
  • ConsumerConfig : holds the consumer configuration keys.
  • @KafkaListener : marks a method to be the target of a Kafka message listener on the specified topics.
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import com.howtodoinjava.kafka.demo.model.User;
 
@Configuration
public class KafkaConsumerConfig 
{
  @Value(value = "${kafka.bootstrapAddress}")
  private String bootstrapAddress;
 
  @Value(value = "${general.topic.group.id}")
  private String groupId;
 
  @Value(value = "${user.topic.group.id}")
  private String userGroupId;
 
  // 1. Consume string data from Kafka
 
  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
        StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
        StringDeserializer.class);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
    return new DefaultKafkaConsumerFactory<>(props);
  }
 
  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, String> 
            kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory 
      = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
  }
 
  // 2. Consume user objects from Kafka
 
  public ConsumerFactory<String, User> userConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, userGroupId);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
    return new DefaultKafkaConsumerFactory<>(props, 
        new StringDeserializer(), 
        new JsonDeserializer<>(User.class));
  }
 
  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, User> 
            userKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, User> factory 
      = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(userConsumerFactory());
    return factory;
  }
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import com.howtodoinjava.kafka.demo.model.User;
 
@Service
public class KafKaConsumerService 
{
  private final Logger logger 
    = LoggerFactory.getLogger(KafKaConsumerService.class);
   
  @KafkaListener(topics = "${general.topic.name}", 
      groupId = "${general.topic.group.id}")
  public void consume(String message) {
    logger.info(String.format("Message recieved -> %s", message));
  }
 
  @KafkaListener(topics = "${user.topic.name}", 
      groupId = "${user.topic.group.id}",
      containerFactory = "userKafkaListenerContainerFactory")
  public void consume(User user) {
    logger.info(String.format("User created -> %s", user));
  }
}

At last, do not forget to enable Kafka listener annotated endpoints (@KafkaListener) using annotation @EnableKafka.

5. Message Producers Configuration

We are creating two producers that will be producing and sending messages to two different topics we created in the 3rd section (topic configuration).

The Kafka multiple producers configuration involve following classes:

  • DefaultKafkaProducerFactory : is used to create singleton Producer instances for the provided config options.
  • KafkaTemplate : is used for executing send message operations in all supported ways.
  • ProducerConfig : holds the producer configuration keys.
  • ListenableFuture : is used to accept completion callbacks after sending messages.
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.howtodoinjava.kafka.demo.model.User;
 
@Configuration
public class KafkaProducerConfig 
{
  @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;
   
  //1. Send string to Kafka
   
  @Bean
  public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      return new DefaultKafkaProducerFactory<>(props);
  }
    
  @Bean
  public KafkaTemplate<String, String> kafkaTemplate() {
      return new KafkaTemplate<>(producerFactory());
  }
   
  //2. Send User objects to Kafka
  @Bean
    public ProducerFactory<String, User> userProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
 
    @Bean
    public KafkaTemplate<String, User> userKafkaTemplate() {
        return new KafkaTemplate<>(userProducerFactory());
    }
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import com.howtodoinjava.kafka.demo.model.User;
 
@Service
public class KafKaProducerService 
{
  private static final Logger logger = 
      LoggerFactory.getLogger(KafKaProducerService.class);
   
  //1. General topic with a string payload
   
  @Value(value = "${general.topic.name}")
    private String topicName;
   
  @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
  //2. Topic with user object payload
     
    @Value(value = "${user.topic.name}")
    private String userTopicName;
     
    @Autowired
    private KafkaTemplate<String, User> userKafkaTemplate;
   
  public void sendMessage(String message) 
  {
    ListenableFuture<SendResult<String, String>> future 
      = this.kafkaTemplate.send(topicName, message);
     
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
              logger.info("Sent message: " + message 
                  + " with offset: " + result.getRecordMetadata().offset());
            }
 
            @Override
            public void onFailure(Throwable ex) {
              logger.error("Unable to send message : " + message, ex);
            }
       });
  }
   
  public void saveCreateUserLog(User user) 
  {
    ListenableFuture<SendResult<String, User>> future 
      = this.userKafkaTemplate.send(userTopicName, user);
     
    future.addCallback(new ListenableFutureCallback<SendResult<String, User>>() {
            @Override
            public void onSuccess(SendResult<String, User> result) {
              logger.info("User created: "
                  + user + " with offset: " + result.getRecordMetadata().offset());
            }
 
            @Override
            public void onFailure(Throwable ex) {
              logger.error("User created : " + user, ex);
            }
       });
  }
}

6. Conclusion

In this spring Kafka multiple consumer java configuration example, we learned to creates multiple topics using TopicBuilder API. Then we configured one consumer and one producer per created topic.

To run the above code, please follow the REST API endpoints created in Kafka JsonSerializer Example.

Happy Learning !!

Sourcecode on Github

Comments

Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.

Our Blogs

REST API Tutorial

Dark Mode

Dark Mode