Apache Kafka using Spring Boot

Learn to create a Spring boot application and run a local Apache Kafka broker instance using Docker Compose. We will also look at how to configure Kafka Producer and Consumer and look at diverse ways to produce and consume messages. Additionally, we will also learn to handle Kafka errors and retry in case of failures.

1. Introduction to Apache Kafka

Apache Kafka is an open-source distributed event streaming service for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. It is based on a publish/subscribe messaging system and is often called a “distributed commit log “.

Kafka possesses three key capabilities:

  • It enables applications to publish or subscribe to data or event streams.
  • Ensures accurate storage of records, in a fault-tolerant and durable manner.
  • Real-time processing of records.

2. Installing Kafka Locally using Docker

The following docker-compose.yml creates a single-node Kafka server with 1 zookeeper and 1 broker instance. The configuration also ensures that the Zookeeper server always starts before the Kafka server (broker) and stops after it.

If a more resilient Kafka setup is needed, then we can use the following docker cluster configuration using Docker Compose. We can change the configurations based on the specific requirements.

---
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:latest
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://broker:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

3. Getting Started with Kafka using Spring Boot

3.1. Maven

To configure and connect Kafka with Spring Boot application we add spring-kafka dependency. The appropriate version is resolved using Spring Boot’s BOM file.

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Additionally, we can add the spring-kafka-test dependency that provides the embedded Kafka which is an in-memory Kafka instance used to run our tests

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
</dependency>

3.2. Minimum Required Kafka Properties

By default, Spring boot autoconfiguration creates an instance of KafkaTemplate that connects to the provided bootstrap servers specified by the spring.kafka.bootstrap-servers property. It is infact the minimum property needed for the remote Kafka instance and for producing the messages using KafkaTemplate.

spring.kafka.bootstrap-servers=localhost:9092

For consuming the messages, we need an additional property spring.kafka.consumer.group-id for configuring the consumer group. It is required to configure the DefaultKafkaConsumerFactory and enable the usage of @KafkaListener for receiving the messages.

spring.kafka.consumer.group-id=default-task-Group

3.3. Default Autoconfiguration

Spring Boot loads the specified configuration properties using the KafkaProperties class. It is annotated with @ConfigurationProperties(prefix = “spring.kafka”) ensuring all the properties under the spring.kafka namespace is loaded.

Spring Boot leverages the KafkaAutoConfiguration class which automatically creates a KafkaTemplate along with other necessary beans such as KafkaProducer, KafkaConsumer, and more. These methods have @ConditionalOnMissingBean annotation, meaning that Spring will create a bean for them only when we have not manually defined one.

We can autowire these beans and use them:

@Autowired
KafkaTemplate kafkaTemplate;

4. Creating Kafka Topics

To create a new topic, we can define the NewTopic beans in a @Configuration class that will be loaded during application startup. It will create a new topic only if there is no existing topic with the same name.

@Configuration
public class KafkaConfig {

    @Bean
    public NewTopic inventoryEventsTopic() {

        return TopicBuilder.name("inventory-events")
                .partitions(1)
                .replicas(1)
                .build();
    }
}

Note that Spring Boot allows using KafkaAdmin to create Kafka topics programmatically.

@Service
public class TaskService {

  @Autowired
  KafkaAdmin kafkaAdmin;

  private void createNewTopic(String topicName) throws ExecutionException, InterruptedException {

    Map<String, String> topicConfig = new HashMap<>();
    topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(24 * 60 * 60 * 1000)); // 24 hours retention

    NewTopic newTopic = new NewTopic(topicName, 1, (short) 1).configs(topicConfig);

    try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
      //Blocking call to make sure topic is created
      adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
    }
  }

  //...
}

5. Producing Messages

5.1. Producer Configuration

Kafka Producers are thread-safe, which means that using a single instance throughout the application context can result in better performance. Consequently, it is recommended to use a single instance of KafkaTemplate, as it is also thread-safe.

To create messages, we first need to configure a ProducerFactory. We use spring.kafka.producer.* properties to let Spring automatically configure the underlying producer factories.

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

Alternatively, We can use Java class to configure ProducerFactory and KafkaTemplate as shown below:

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.producer.bootstrap-servers}")
    private List<String> bootstrapAddress;

    @Bean
    public ProducerFactory<?, ?> producerFactory() {

        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<?, ?> kafkaTemplate() {

        return new KafkaTemplate<>(producerFactory());
    }
}

5.2. Using KafkaTemplate to Publish Messages

The KafkaTemplate wraps the producer and provides convenient overloaded send(...) methods for sending data to Kafka topics. For example, the send(String topic, K key, V data) takes three parameters: topic name, key, and value. It returns CompletableFuture<SendResult<K, V>> enabling the creation of non-blocking code.

@Slf4j
@Component
public class InventoryEventProducer {

    @Value("${spring.kafka.topic}")
    public String topic;

    @Autowired
    private KafkaTemplate<Integer, Object> kafkaTemplate;

    public CompletableFuture<SendResult<Integer, Object>> sendInventoryEvent(InventoryEvent event) throws JsonProcessingException {

        var key = inventoryEvent.getInventoryId();

        var completableFuture = kafkaTemplate.send(topic, key, event);
        return completableFuture;
}

Additionally, we can also specify some metadata about the message in the headers as shown below:

List<Header> recordHeader = List.of(new RecordHeader("event-source", "inventory-event-producer".getBytes()));
var producerRecord = new ProducerRecord<>(topic, null, key, value, recordHeader);

kafkaTemplate.send(producerRecord);

In this case, the partition value is set to null to let Spring automatically handle which Kafka partition it will send the message.

5.3. Using RoutingKafkaTemplate

The RoutingKafkaTemplate allows selecting the producer at runtime based on the destination topic name. It requires a map of java.util.regex.Pattern to ProducerFactory instances.

@Bean 
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context, ProducerFactory<Object, Object> pf) {
  Map<String, Object> configMap = new HashMap<>(pf.getConfigurationProperties());
  configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
  
  DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configMap);
  context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);

  Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
  map.put(Pattern.compile("default-topic"), bytesPF);
  map.put(Pattern.compile("inventory-events"), pf); // Default PF with JsonSerializer
  return new RoutingKafkaTemplate(map);
}

The following example shows how to use the same template to send to different topics, with each producer using a different value serializer.

@Slf4j
@Component
public class RoutingKafkaProducer {

    private RoutingKafkaTemplate routingTemplate;

    public void sendDefaultTopic(String message) {
        routingTemplate.send("default-topic", message.getBytes());
    }

    public void sendInventoryEvent(InventoryEvent inventoryEvent) {
        routingTemplate.send("inventory-events", inventoryEvent);
    }
}

5.4. Error Handling and Recovery

In real-life applications, there will be events of failures when sending messages to Kafka due to unforeseen conditions such as network downtime. We can handle such exception cases by utilizing the CompletableFuture returned by the send() method.

TheCompletableFuture.whenComplete() method allows to access to both the successful and failed results and thus allows us to handle different outcomes effectively.

public CompletableFuture<SendResult<Integer, Object>> sendInventoryEvent_ProducerRecord(InventoryEvent inventoryEvent) throws JsonProcessingException {

        //...
        var completableFuture = kafkaTemplate.send(producerRecord);

        return completableFuture.whenComplete(((sendResult, throwable) -> {
            if (throwable != null) {
                handleFailure(key, inventoryEvent, throwable);
            } else {
                handleSuccess(key, inventoryEvent, sendResult);
            }
        }));
}

private void handleSuccess(Integer key, Object value, SendResult<Integer, Object> sendResult) {

        log.info("Message sent successfully for the key: {} and the value: {}, partition is: {}",
                key, value, sendResult.getRecordMetadata().partition());
}

private void handleFailure(Integer key, Object value, Throwable throwable) {

        log.error("Error sending message and exception is {}", throwable.getMessage(), throwable);
}

Within the handleFailure() method, we have the flexibility to incorporate our personalized approach for handling and recovering from failures. This may involve redirecting the failed messages to a different topic or persisting them in a database for further analysis.

5.5. Acknowledgement and Retry

Kafka producers must specify the level of acknowledgment, known as acks, to determine the minimum number of replicas that need to receive the message before considering the write operation successful. Moreover, we can also define retries and retry.backoff.ms parameters to establish the maximum number of attempts the producer makes to send a message before considering it a failure.

spring:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        acks: all
        retries: 10
        retry.backoff.ms: 1000

6. Consuming Messages

To consume messages, Spring Kafka provides a couple of ways. Here are some common approaches:

  • @KafkaListener and @KafkaHandler Annotations
  • Implementing MessageListener Interface

The @KafkaListener annotation is the most straightforward and recommended way.

6.1. Configuration

To configure the underlying consumer factories, you can use the spring.kafka.consumer.* properties. Here’s an example configuration that sets the value deserializer and other properties:

spring:
  kafka:
    topic: "inventory-events"
    bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
    consumer:
      bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
      key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: '*'

While using JsonSerializer/Deserializer we might get Caused by: java.lang.IllegalArgumentException: The class ‘com.howtodoinjava.demo.kafka.consumer.model.InventoryEvent’ is not in the trusted packages: [java.util, java.lang].
The error is due to model class for producer and consumer are in separate packages.

To solve this issue we add spring.json.trusted.packages: ‘*’ in configuration property

If we are NOT using Spring Boot then we need to use @EnableKafka annotation on the configuration class to enable the detection of @KafkaListener annotation.

@Configuration
@EnableKafka
public class InventoryEventsConsumerConfig {

        // code here...
}

6.2. Consuming Messages with @KafkaListener and @KafkaHandler

The @KafkaListener annotation is used to designate a bean method as a listener for a specific topic(s). It simplifies the process of building a Kafka consumer and internally utilizes ConcurrentMessageListenerContainer for message consumption.

@Component
@Slf4j
public class InventoryEventsConsumer {

    @KafkaListener(topics = {"inventory-events"})
    public void onMessage(ConsumerRecord<Integer, InventoryEvent> consumerRecord) {

        log.info("Consumer Record: {}", consumerRecord);
    }
}

Spring also supports retrieval of messages using @Payload and headers info using @Header annotation:

@KafkaListener(topics ={"inventory-events"})
public void onMessage(
	@Payload String message,
	@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
	@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {

	//...
}

6.3. Consuming Messages with MessageListener Interface

By implementing MessageListener interface and its onMessage() method, we can create a custom message listener to handle the received messages.

@Component
public class MyMessageListener implements MessageListener<Integer, InventoryEvent> {

    @Override
    public void onMessage(ConsumerRecord<Integer, InventoryEvent> record) {

        String message = record.value();
        // Process the received message
    }
}

6.4. Configuring Consumer Group and Partition Assignment

When configuring a Kafka listener, it’s a best practice to include a groupId parameter to identify the consumer group to which the consumer belongs. Multiple listeners can be created for a single topic, each with a unique groupId. A consumer can also listen to messages from multiple topics.

@KafkaListener(topics = {"inventory-events", "another-topic"}, groupId = "inventory-consumer-group-1")
public void onMessage(ConsumerRecord<Integer, Object> consumerRecord) {

    // Message handling logic...
}

For Kafka topics with multiple partitions, a @KafkaListener can explicitly subscribe to a specific partition within the topic and specify an initial offset.

@KafkaListener(
    topicPartitions = @TopicPartition(topic = "inventory-events",
    partitionOffsets = { 
        @PartitionOffset(partition = "0", initialOffset = "0"), 
        @PartitionOffset(partition = "3", initialOffset = "0")}
    ))
public void onMessage(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {

    // Message handling logic...
}

If we don’t need to set the offset, you can use the partitions property of the @TopicPartition annotation to specify only the partitions without the offset.

@KafkaListener(topicPartitions = @TopicPartition(topic = "inventory-events", partitions = { "0", "1" }))
public void onMessage(ConsumerRecord<Integer, String> consumerRecord) {

    //....
}

6.5. Acknowledgement and Offset Commit

In Kafka, there are multiple options for committing offsets. If the enable.auto.commit consumer property is set to true, Kafka auto-commits offsets according to their configuration. If it is set to false, we can use different AckMode settings supported by the containers. The default AckMode is BATCH.

To set an AckMode, we override the default kafkaListenerContainerFactory bean in our configuration.

@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory,
            ObjectProvider<ContainerCustomizer<Object, Object, 
            ConcurrentMessageListenerContainer<Object, Object>>> kafkaContainerCustomizer) {

  ConcurrentKafkaListenerContainerFactory<Object, Object> factory 
    = new ConcurrentKafkaListenerContainerFactory<>();

  configurer.configure(factory, kafkaConsumerFactory
      .getIfAvailable(() -> new DefaultKafkaConsumerFactory<>
        (this.properties.buildConsumerProperties())));

  kafkaContainerCustomizer.ifAvailable(factory::setContainerCustomizer);
  factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
  return factory;
}

Next, in message consumer class, we can implement the AcknowledgingMessageListener interface to access the ConsumerRecord and the Acknowledgment instance. This allows us to acknowledge Kafka messages manually.

@Component
@Slf4j
public class InventoryEventsConsumerManualOffset implements AcknowledgingMessageListener<Integer, String> {

    @Override
    @KafkaListener(topics = "inventory-events")
    public void onMessage(ConsumerRecord<Integer, String> consumerRecord, Acknowledgment acknowledgment) {
        log.info("Consumer Record: {}", consumerRecord);
        acknowledgment.acknowledge();
    }
}

6.6. Configuring Concurrency

The ConcurrentMessageListener provides the ability to set up multiple listeners within the same application using the setConcurrency() method. This feature proves beneficial in situations where we don’t use cloud services to scale applications based on load.

@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(...) {

  ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
  
  //...  
  factory.setConcurrency(3);
  return factory;
}

6.7. Error Handling and Retry

Error handlers, such as the DefaultErrorHandler, utilize a BackOff mechanism to determine the interval between retry attempts. We can choose between different BackOff strategies to customize the behavior.

Here’s an example of configuring a DefaultErrorHandler with a FixedBackOff strategy:

@Bean
public DefaultErrorHandler errorHandler() {

    FixedBackOff fixedBackOff = new FixedBackOff(1000L, 2);
    return new DefaultErrorHandler(fixedBackOff);
}

Alternatively, we can use the ExponentialBackOffWithMaxRetries strategy, which increases the interval between retry as the number of attempts grows until reaching a specified maximum threshold.

@Bean 
public DefaultErrorHandler errorHandler() {

        var exponentialBackOff = new ExponentialBackOffWithMaxRetries(2);
        exponentialBackOff.setInitialInterval(1000L);
        exponentialBackOff.setMultiplier(2L);
        exponentialBackOff.setMaxInterval(4000L);
        return new DefaultErrorHandler(exponentialBackOff);
}

We can also configure RetryListener instance to receive notifications of every retry and recovery attempts.

public DefaultErrorHandler errorHandler() {

        errorHandler.setRetryListeners((consumerRecord, ex, deliveryAttempt) -> {
            log.info("Failed Record in Retry Listener, Exception: {}, deliveryAttempt: {}",
                    ex.getMessage(), deliveryAttempt);
        });

        return errorHandler;
}

DefaultErrorHandler provides addNotRetryableExceptions() for excluding exceptions (such as DeserializationException, NullPointerException, etc.) that cannot be recovered and retries are skipped for such exceptions. Alternatively, we also have the addRetryableExceptions() function, that allows us to specify only those exceptions that we want to be retried.

 public DefaultErrorHandler errorHandler() {
        
        DefaultErrorHandler errorHandler = new DefaultErrorHandler();
        var exceptionsToIgnoreList = List.of(IllegalArgumentException.class);
        exceptionsToIgnoreList.forEach(errorHandler::addNotRetryableExceptions);   
        return errorHandler;
}

Finally, we set the error handler in our consumer factory.

@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(...) {

        //...
        factory.setCommonErrorHandler(errorHandler());
        return factory;
}

6.8. Message Recovery

The DefaultErrorHandler allows configuring a recovery mechanism when the maximum number of retry attempts is reached without success. Two standard recoverer classes are DeadLetterPublishingRecoverer and ConsumerRecordRecoverer, which allows us to publish failed messages on another topic.

Using DeadLetterPublishingRecoverer

@Value("${topics.retry}")
private String retryTopic;

@Value("${topics.dlt}")
private String deadLetterTopic;

@Autowired
private KafkaTemplate kafkaTemplate;

public DeadLetterPublishingRecoverer publishingRecoverer() {

        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
                (r, e) -> {
                    if (e.getCause() instanceof RecoverableDataAccessException) {
                        return new TopicPartition(retryTopic, r.partition());
                    }
                    else {
                        return new TopicPartition(deadLetterTopic, r.partition());
                    }
                });
        return recoverer;
}

public DefaultErrorHandler errorHandler() {

        //....
        var errorHandler = new DefaultErrorHandler(
                publishingRecoverer(),
                exponentialBackOff
        );
        //....
        return errorHandler;
 }

Using ConsumerRecordRecoverer

ConsumerRecordRecoverer consumerRecordRecoverer = (record, exception) -> {

        log.error("Exception is : {} Failed Record : {} ", exception, record);
        if (exception.getCause() instanceof RecoverableDataAccessException) {
            log.info("Inside the recoverable logic");
            //Add any Recovery Code here.
            kafkaTemplate.send(retryTopic, record.partition(), record.key(), record.value());

        } else {
            log.info("Inside the non recoverable logic and skipping the record : {}", record);
            kafkaTemplate.send(deadLetterTopic, record.partition(), record.key(), record.value());

        }
};

public DefaultErrorHandler errorHandler() {

        //....
        var errorHandler = new DefaultErrorHandler(
                consumerRecordRecoverer,
                exponentialBackOff
        );
        //....
        return errorHandler;
 }

7. Conclusion

This Spring Boot Kafka tutorial delved into Setting up a Kafka cluster, demonstrating how to integrate Apache Kafka with a Spring Boot application. It covered topics such as configuring Kafka Producer and Consumer, producing and consuming messages, handling Kafka errors, and implementing retries. By following the step-by-step instructions and leveraging the Spring Kafka framework, developers can effectively integrate Kafka into their projects.

Happy Learning !!

Sourcecode on Github

Comments

Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.