The @KafkaListener and @KafkaHandler annotations are part of Spring for Apache Kafka integration. They provide convenient ways to consume and handle messages from Kafka topics.
1. Maven
To receive messages using @KafkaListener, we need to add the latest version Spring Kafka module, if not included already.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.7</version> <!-- version is optional in a spring boot project-->
</dependency>
2. Default Behavior
When we add the bare minimum required properties, Spring boot’s KafkaAutoConfiguration class creates and registers the DefaultKafkaConsumerFactory
along with other necessary beans.
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-task-group
The default autoconfiguration allows us to use the @KafkaListener annotated methods in a Spring @Component to receive the messages from the specified topics.
@Component
@Log
public class KafKaTopicListeners {
private final Logger logger = LoggerFactory.getLogger(KafKaTopicListeners.class);
@KafkaListener(topics = "general-task-topic")
public void consume(TaskStatus taskStatus) {
logger.info(String.format("Task status is updated : " + taskStatus));
}
}
3. @KafkaListener Annotation
The @KafkaListener annotation is used to mark a method as a Kafka message listener on the specified topics. Processing of @KafkaListener annotations is performed by registering a KafkaListenerAnnotationBeanPostProcessor.
The @KafkaListener is a repeatable annotation and it can be declared several times on the same method (or class), implicitly generating @KafkaListeners container annotation.
Apart from providing the topic names, we can specify the consumer group and factory details as well.
@KafkaListener(
id = "myListener",
topics = "my-topic",
groupId = "my-group",
concurrency = "3",
containerFactory = "kafkaListenerContainerFactory"
)
public void handleMessage(String message) {
//...
}
Also, note that the annotated methods can also specify the following parameters:
ConsumerRecord
: to access the raw Kafka message.Acknowledgment
: to manually acknowledge the message.@Payload
: binds a method parameter to the payload of a message.@Header
: binds a method parameter method to a message header.@Headers
: binds a method parameter method to all message headers.MessageHeaders
: headers for a message.MessageHeaderAccessor
: wrapper around MessageHeaders that provides extra features.
@KafkaListener(topics = "my-topic")
public void handleMessage(ConsumerRecord<String, String> record) {
String key = record.key();
String value = record.value();
//...
}
Another example is the annotated method accepting message headers:
@KafkaListener(topics = "my-topic")
public void handleMessage(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
//...
}
We can use @KafkaListener annotation on a method as well as at the class level.
3.1. @KafkaListener at Method Level
When @KafkaListener is defined at the method level, a listener container is created for each method configured with a MethodKafkaListenerEndpoint.
@Component
public class KafKaTopicListeners {
@KafkaListener(topics = "general-task-topic")
public void handleMessage(String message) {
//...
}
@KafkaListener(topics = "general-task-topic")
public void handleMessage(TaskStatus taskStatus) {
//...
}
@KafkaListener(topics = "another-topic")
public void handleMessage(String message) {
//...
}
}
3.2. @KafkaListener at Class Level
When @KafkaListener is defined at the class level, a single message listener container is used to service all methods annotated with @KafkaHandler configured with a MultiMethodKafkaListenerEndpoint.
Please note that it is important that methods must not cause any ambiguity; a single method should be resolved as the handler for a particular inbound message.
@Component
@KafkaListener(topics = "general-task-topic")
@KafkaListener(topics = "another-topic")
public class KafKaTopicListeners {
@KafkaHandler
public void handleMessage(String message) {
//...
}
@KafkaHandler
public void handleMessage(TaskStatus taskStatus) {
//...
}
@KafkaHandler
public void handleMessage(TaskResult message) {
//...
}
}
3.3. Default Message Handler
When a message arrives, the method selection depends on the class type of the message. The class type is matched with a single non-annotated parameter or one that is annotated with @Payload. If there is no match found for the message type, we can designate a default fallback method to handle all such unmatched messages.
@KafkaHandler(isDefault = true)
public void handleMessage(Object message) {
//...
}
4. Demo
For demo purposes, we are using the application created for KafkaTemplate example. We will be publishing the two messages. The first message payload is TaskStatus and we will assign a specific listener for it. The second message will be received by the default message handler.
@SpringBootApplication
public class Application implements CommandLineRunner {
@Autowired
KafkaTemplate kafkaTemplate;
@Autowired
KafkaProperties kafkaProperties;
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Override
public void run(String... args) throws Exception {
//1st message
kafkaTemplate.send("general-task-topic", 0, "taskId", new TaskStatus("taskId", "taskName", 50.0f, Status.RUNNING));
//2nd message
kafkaTemplate.send("general-task-topic", "test-message");
}
}
The message listeners for the above messages is:
@Component
@Log
@KafkaListener(topics = "general-task-topic", groupId = "task-group")
public class KafKaTopicListeners {
private final Logger logger = LoggerFactory.getLogger(KafKaTopicListeners.class);
@KafkaHandler
public void handleMessage(TaskStatus taskStatus) {
logger.info(String.format("Task status is updated : " + taskStatus));
}
@KafkaHandler(isDefault = true)
public void handleMessage(Object message) {
logger.info(String.format("The default message is : " + message.toString()));
}
}
Run the application and observe the output:
... c.h.d.jms.service.KafKaTopicListeners : Task status is updated : TaskStatus(taskId=taskId, taskName=taskName, percentageComplete=50.0, status=RUNNING)
... c.h.d.jms.service.KafKaTopicListeners : Task status is updated : test-message
We successfully received both messages.
5. Conclusion
This Spring boot Kafka tutorial discussed the different ways we can use the @KafkaListener and @KafkaHandler annotations. We discussed the annotation attributes and annotated method parameters. Finally, we saw a demo that showed both annotations in action.
Happy Learning !!
Comments