Spring Boot @KafkaListener and @KafkaHandler Example

The @KafkaListener and @KafkaHandler annotations are part of Spring for Apache Kafka integration. They provide convenient ways to consume and handle messages from Kafka topics.

1. Maven

To receive messages using @KafkaListener, we need to add the latest version Spring Kafka module, if not included already.

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>3.0.7</version> <!-- version is optional in a spring boot project-->
</dependency>

2. Default Behavior

When we add the bare minimum required properties, Spring boot’s KafkaAutoConfiguration class creates and registers the DefaultKafkaConsumerFactory along with other necessary beans.

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-task-group

The default autoconfiguration allows us to use the @KafkaListener annotated methods in a Spring @Component to receive the messages from the specified topics.

@Component
@Log
public class KafKaTopicListeners {

  private final Logger logger = LoggerFactory.getLogger(KafKaTopicListeners.class);

  @KafkaListener(topics = "general-task-topic")
  public void consume(TaskStatus taskStatus) {

    logger.info(String.format("Task status is updated : " + taskStatus));
  }
}

3. @KafkaListener Annotation

The @KafkaListener annotation is used to mark a method as a Kafka message listener on the specified topics. Processing of @KafkaListener annotations is performed by registering a KafkaListenerAnnotationBeanPostProcessor.

The @KafkaListener is a repeatable annotation and it can be declared several times on the same method (or class), implicitly generating @KafkaListeners container annotation.

Apart from providing the topic names, we can specify the consumer group and factory details as well.

@KafkaListener(
  id = "myListener",
  topics = "my-topic",
  groupId = "my-group",
  concurrency = "3",
  containerFactory = "kafkaListenerContainerFactory"
)
public void handleMessage(String message) {
	
	//...
}

Also, note that the annotated methods can also specify the following parameters:

  • ConsumerRecord: to access the raw Kafka message.
  • Acknowledgment: to manually acknowledge the message.
  • @Payload: binds a method parameter to the payload of a message.
  • @Header: binds a method parameter method to a message header.
  • @Headers: binds a method parameter method to all message headers.
  • MessageHeaders: headers for a message.
  • MessageHeaderAccessor: wrapper around MessageHeaders that provides extra features.
@KafkaListener(topics = "my-topic")
public void handleMessage(ConsumerRecord<String, String> record) {
	
	String key = record.key();
        String value = record.value();
	//...
}

Another example is the annotated method accepting message headers:

@KafkaListener(topics = "my-topic")
public void handleMessage(
	@Payload String message,
	@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
	@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {

	//...
}

We can use @KafkaListener annotation on a method as well as at the class level.

3.1. @KafkaListener at Method Level

When @KafkaListener is defined at the method level, a listener container is created for each method configured with a MethodKafkaListenerEndpoint.

@Component
public class KafKaTopicListeners {

  @KafkaListener(topics = "general-task-topic")
  public void handleMessage(String message) {

    //...
  }

  @KafkaListener(topics = "general-task-topic")
  public void handleMessage(TaskStatus taskStatus) {

    //...
  }

  @KafkaListener(topics = "another-topic")
  public void handleMessage(String message) {

    //...
  }
}

3.2. @KafkaListener at Class Level

When @KafkaListener is defined at the class level, a single message listener container is used to service all methods annotated with @KafkaHandler configured with a MultiMethodKafkaListenerEndpoint.

Please note that it is important that methods must not cause any ambiguity; a single method should be resolved as the handler for a particular inbound message.

@Component
@KafkaListener(topics = "general-task-topic")
@KafkaListener(topics = "another-topic")
public class KafKaTopicListeners {

  @KafkaHandler
  public void handleMessage(String message) {

    //...
  }

  @KafkaHandler
  public void handleMessage(TaskStatus taskStatus) {

    //...
  }

  @KafkaHandler
  public void handleMessage(TaskResult message) {

    //...
  }
}

3.3. Default Message Handler

When a message arrives, the method selection depends on the class type of the message. The class type is matched with a single non-annotated parameter or one that is annotated with @Payload. If there is no match found for the message type, we can designate a default fallback method to handle all such unmatched messages.

@KafkaHandler(isDefault = true)
public void handleMessage(Object message) {

  //...
}

4. Demo

For demo purposes, we are using the application created for KafkaTemplate example. We will be publishing the two messages. The first message payload is TaskStatus and we will assign a specific listener for it. The second message will be received by the default message handler.

@SpringBootApplication
public class Application implements CommandLineRunner {

  @Autowired
  KafkaTemplate kafkaTemplate;

  @Autowired
  KafkaProperties kafkaProperties;

  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }

  @Override
  public void run(String... args) throws Exception {

  	//1st message
    kafkaTemplate.send("general-task-topic", 0, "taskId", new TaskStatus("taskId", "taskName", 50.0f, Status.RUNNING));

    //2nd message
    kafkaTemplate.send("general-task-topic", "test-message");
  }
} 

The message listeners for the above messages is:

@Component
@Log
@KafkaListener(topics = "general-task-topic", groupId = "task-group")
public class KafKaTopicListeners {

  private final Logger logger = LoggerFactory.getLogger(KafKaTopicListeners.class);

  @KafkaHandler
  public void handleMessage(TaskStatus taskStatus) {

    logger.info(String.format("Task status is updated : " + taskStatus));
  }

  @KafkaHandler(isDefault = true)
  public void handleMessage(Object message) {

    logger.info(String.format("The default message is : " + message.toString()));
  }
}

Run the application and observe the output:

... c.h.d.jms.service.KafKaTopicListeners    : Task status is updated : TaskStatus(taskId=taskId, taskName=taskName, percentageComplete=50.0, status=RUNNING)

... c.h.d.jms.service.KafKaTopicListeners    : Task status is updated : test-message

We successfully received both messages.

5. Conclusion

This Spring boot Kafka tutorial discussed the different ways we can use the @KafkaListener and @KafkaHandler annotations. We discussed the annotation attributes and annotated method parameters. Finally, we saw a demo that showed both annotations in action.

Happy Learning !!

Source Code on Github

Comments

Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.

Our Blogs

REST API Tutorial

Dark Mode

Dark Mode