Using KafkaTemplate with Spring Boot

The KafkaTemplate follows the typical Spring template programming model for interacting with a Kafka cluster including publishing new messages and receiving the messages from the specified topic.

1. Maven

The KafkaTemplate is part of spring-kafka module that provides the necessary support to interact with Kafka cluster.

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>3.0.7</version> <!-- version is optional in a spring boot project-->
</dependency>

2. Default Behavior

By default, Spring boot autoconfiguration creates an instance of KafkaTemplate that connects to the provided bootstrap servers specified in the properties file.

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=default-task-Group
@Autowired
KafkaTemplate kafkaTemplate;

kafkaTemplate.send("my-topic", messageObject);

Since Spring boot 3.0, the send methods return CompletableFuture instead of ListenableFuture. These methods are non-blocking, thus allowing the application to continue executing without waiting for the acknowledgment.

Also, by default, Spring Boot provides StringSerializer for keys and ByteArraySerializer for values. However, we can configure custom serializers based on the specific data types. For example, in this article, we are using the JsonSerializer and JsonDeserializer classes.

It is worth knowing that KafkaTemplate selects the topic partition based on the message key. If the key is null or no key is provided, the KafkaTemplate uses a round-robin approach to select the partition.

3. Custom Initialization of KafkaTemplate

There are two methods to customize the KafkaTemplate for sending and receiving messages.

3.1. Properties Configuration

We can use the spring.kafka.producer.* and spring.kafka.consumer.* properties to configure the underlying producer and consumer factories.

In the following configuration, we are using JsonDeserializer to serialize and deserialize the value of the messages, apart from setting other properties.

spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=default-task-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.properties.spring.json.trusted.packages=*

spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

3.2. Java Configuration

A similar configuration can be done in @Configuration class as follows:

@Configuration
public class KafkaConfig {

  @Bean
  public ConsumerFactory<String, TaskStatus> consumerFactory() {

    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
    configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "default-task-group");
    configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    return new DefaultKafkaConsumerFactory<>(configProps);
  }


  @Bean
  public ProducerFactory<String, TaskStatus> producerFactory() {

    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new DefaultKafkaProducerFactory<String, TaskStatus>(props);
  }

  @Bean
  public KafkaTemplate<String, TaskStatus> kafkaTemplate() {
    var kafkaTemplate = new KafkaTemplate<>(producerFactory());
    kafkaTemplate.setConsumerFactory(consumerFactory());
    return kafkaTemplate;
  }
}

4. Sending Messages

4.1. Methods

The KafkaTemplate wraps the producer and provides convenient methods to send data to Kafka topics using the following relevant methods:

CompletableFuture<SendResult<K, V>> sendDefault(V data);

CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, V data);

CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

CompletableFuture<SendResult<K, V>> send(Message<?> message);

4.2. Default Topic Configuration

Note that the sendDefault() method requires that a default topic be configured in DefaultKafkaProducerFactory using the setDefaultTopic() method.

@Bean
public ProducerFactory<String, TaskStatus> producerFactory() {

    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);
    producerFactory.setDefaultTopic("my-default-topic");

    return producerFactory;
}

Or we can set the default topic in the KafkaTemplate, directly.

KafkaTemplate<String, TaskStatus> template = new KafkaTemplate<>(producerFactory);
template.setDefaultTopic("my-default-topic");

5. Receiving Messages

Starting with Spring Boot 2.8, the KafkaTemplate has four receive() methods:

ConsumerRecord<K, V> receive(String topic, int partition, long offset);

ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout);

ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested);

ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout);

As you can see that we need to know the partition and offset of the record(s) to retrieve the records. Also, a new Consumer is created (and closed) for each receive operation. For this reason, it is advisable to use the @KafkaListener for receiving the messages which use the internally maintained partition and offset metadata.

6. Error Handling

In case of any error, the method throws KafkaException (or any subtype) and it is the responsibility of the application to handle these exceptions.

@Component
public class MessageListener {

    @KafkaHandler
    @ErrorHandler
    public void handleKafkaException(KafkaException exception) {
    
        // Handle the KafkaException
        // ...
    }
}

7. Demo

In the following example, we are sending two messages using the KafkaTemplate and then receiving those messages using kafkaTemplate.recieve() method. Note that we are changing the offset when retrieving the messages. For the first message, the offset is 0, and for the second message, the offset is 1.

@SpringBootApplication
public class LongRunningTaskApplication implements CommandLineRunner {

  @Autowired
  KafkaTemplate kafkaTemplate;

  @Autowired
  KafkaProperties kafkaProperties;

  public static void main(String[] args) {
    SpringApplication.run(LongRunningTaskApplication.class, args);
  }

  @Override
  public void run(String... args) throws Exception {

    kafkaTemplate.send("general-task-topic", 0, "taskId", new TaskStatus("taskId", "taskName", 50.0f, Status.RUNNING));
    kafkaTemplate.send("general-task-topic", 0, "taskId", new TaskStatus("taskId", "taskName", 100.0f, Status.FINISHED));

    var message1 = kafkaTemplate.receive("general-task-topic", 0, 0L);
    var message2 = kafkaTemplate.receive("general-task-topic", 0, 1L);

    System.out.println(message1.value());
    System.out.println(message2.value());
  }
}

Program output:

TaskStatus(taskId=taskId, taskName=taskName, percentageComplete=50.0, status=RUNNING)
TaskStatus(taskId=taskId, taskName=taskName, percentageComplete=100.0, status=FINISHED)

8. Conclusion

In this short Apache Kafka tutorial, we learned the basics of KafkaTemplate. We also learned how to configure KafkaTemplate with custom properties, and producer and consumer factoring. Finally, we tested the configuration with a demo program.

Happy Learning !!

Source Code on Github

Comments

Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.

Our Blogs

REST API Tutorial

Dark Mode

Dark Mode