Spring Boot and Kafka using Docker Compose

Learn to create a Spring boot application, run a local Apache Kafka broker instance using Docker Compose, configure the Kafka to message producer as KafkaTemplate and message consumer using @KafkaListener.

1. Goals

In this tutorial, we will learn to:

  • Create a Spring boot application with Kafka dependencies.
  • Setup and run a local Kafka broker using an inbuilt docker-compose module.
  • Configure Kafka properties for message producer and listener.
  • Use KafkaTemplate to send messages to a topic
  • Use @KafkaListener to listen to messages sent to the topic in real-time

2. Maven

Open Spring initializr and create a Spring boot application with the following dependencies:

  • Spring for Apache Kafka
  • Spring Web
  • Docker Compose Support (Optional). It is needed if you plan to use a local Kafka installation running on a Docker container.

The generated project has the following dependencies in pom.xml.

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-docker-compose</artifactId>
  <optional>true</optional>
</dependency>

3. Docker Compose File Kafka Broker

If you are planning to use a local Kafka broker running as the Docker container in your machine, you can follow this step. Create a docker-compose.yaml file in the application root and provide the following docker cluster configuration. Note that you can change the configuration based on your specific requirements.

The following configuration will create a single-node Kafka server with 1 zookeeper and 1 broker instance. The configuration also ensures that the Zookeeper server always starts before the Kafka server (broker) and stops after it.

The best part is that spring-boot-docker-compose module will automatically detect this configuration file, download the necessary images, and run the containers before the application is fully started to accept any request.

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "22181:2181"

  broker:
    image: confluentinc/cp-kafka:latest
    container_name: broker
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

4. Kafka Configuration

Start with making small configuration changes and know what capability they provide.

4.1. Minimal Configuration

The Kafka configuration is controlled by the configuration properties with the prefix spring.kafka.*. For example, add the following property so Spring boot auto-configuration will create a KafkaTemplate and other necessary beans.

spring.kafka.bootstrap-servers=localhost:9092

4.2. Create a New Topic

If we want to create a new Topic in the Kafka broker, if it does not exist, we can provide the topic configuration as follows:

@Configuration
public class KafkaConfig {

  @Bean
  public NewTopic taskTopic() {
    return TopicBuilder.name("task-topic")
        .partitions(1)
        .replicas(1)
        .build();
  }
}

4.3. Custom Key-Value Serializer and Deserializer

By default, Kafka supports message keys and values as normal String types. We can send the custom Java type as the message value, also, after configuring the appropriate serializers and deserializers.

Suppose, we message value to send is an instance of TaskStatus class.

@Data
@AllArgsConstructor
@NoArgsConstructor
public class TaskStatus implements Serializable {

  private String taskId;
  private String taskName;
  private float percentageComplete;
  private Status status;

  public enum Status {
    SUBMITTED, STARTED, RUNNING, FINISHED, TERMINATED
  }
}

We want to send the serialized value of TaskStatus as a JSON object so that it can be used by any consumer in a technology-agnostic manner.

# consumer config

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

# producer config

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

5. KafkaTemplate as Message Producer

If we want to configure a Kafka message producer in the application, KafkaTemplate is an excellent choice for its Template style usage, for example, as learned in DynamoDbTemplate or RestTemplate.

KafKaProducerService class uses autowired KafkaTemplate to send messages to the specified topic name.

@Service
@Log
public class KafkaProducerService {

  private final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerService.class);

  @Autowired
  KafkaTemplate<String, TaskStatus> kafkaTemplate;

  public void send(String topicName, String key, TaskStatus value) {

    var future = kafkaTemplate.send(topicName, key, value);

    future.whenComplete((sendResult, exception) -> {
      if (exception != null) {
        future.completeExceptionally(exception);
      } else {
        future.complete(sendResult);
      }
      LOGGER.info("Task status send to Kafka topic : "+ value);
    });
  }
}

We can use this service to send a message as follows:

@Autowired
KafkaProducerService kafkaProducerService;

//Using service

kafkaProducerService.send("general-task-topic", taskStatus.getTaskId(), taskStatus);

6. @KafkaListener as Message Consumer

Similarly, KafKaConsumerService class uses @KafkaListener to receive messages from the specified topic name and group id.

@Component
@Log
public class KafKaTopicListeners {

  private final Logger logger = LoggerFactory.getLogger(KafKaTopicListeners.class);

  @KafkaListener(topics = {"general-task-topic"}, groupId = "task-group")
  public void consume(TaskStatus taskStatus) {

    logger.info(String.format("Task status is updated : " + taskStatus));
  }
}

7. Demo

For demo purposes, we can use the CommandLineRunner interface to send a few messages and then check the listener class for verification of the Kafka setup.

@SpringBootApplication
public class Application implements CommandLineRunner {

  @Autowired
  KafkaTemplate kafkaTemplate;

  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }

  @Override
  public void run(String... args) throws Exception {

    kafkaTemplate.send("general-task-topic", "taskId", new TaskStatus("taskId", "taskName", 50.0f, Status.RUNNING));
    kafkaTemplate.send("general-task-topic", "taskId", new TaskStatus("taskId", "taskName", 100.0f, Status.FINISHED));
  }
}

Observe the console logs:

c.h.d.jms.service.KafKaTopicListeners    : Task status is updated : TaskStatus(taskId=taskId, taskName=taskName, percentageComplete=50.0, status=RUNNING)

c.h.d.jms.service.KafKaTopicListeners    : Task status is updated : TaskStatus(taskId=taskId, taskName=taskName, percentageComplete=100.0, status=FINISHED)

8. Conclusion

In this spring boot Kafka example, we learned to create a spring boot application and configure Kafka broker instance, and used Spring boot autoconfiguration provided KafkaTemplate and KafkaListener.

Additionally, we verified the application by posting some messages using KafkaTemplate and then consuming the messages using @KafkaListener.

Happy Learning !!

Source Code on Github

Comments are closed for this article!

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.