Spring Boot and Apache Kafka Example

Learn to create a spring boot application that is able to connect a given Apache Kafka broker instance. Also, learn to produce and consume messages from a Kafka topic.

1. Introduction

In this tutorial, we will learn to:

  • Create Spring boot application with Kafka dependencies
  • Configure kafka broker instance in application.yaml
  • Use KafkaTemplate to send messages to topic
  • Use @KafkaListener to listen to messages sent to topic in real time

1.1. Prerequisites

  • Please follow this guide to setup Kafka on your machine.
  • We are creating a maven based Spring boot application, so your machine should have minimum Java 8 and Maven installed.

2. Maven

Open spring initializr and create spring boot application with following dependencies:

  • Spring for Apache Kafka
  • Spring Web
Create Spring boot kafka application
Create Spring boot Kafka application

The generated project has the following dependencies in pom.xml.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Import the project in the preferred IDE.

3. Configure Kafka Broker

In application.yaml file, add Kafka broker address as well as consumer and producer related configuration.

server:
  port: 9000
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: group-id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

4. KafKaProducerService and KafKaConsumerService

KafKaProducerService class uses autowired KafkaTemplate to send messages to the configured topic name.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate; 
import org.springframework.stereotype.Service;
import com.howtodoinjava.kafka.demo.common.AppConstants;
 
@Service
public class KafKaProducerService 
{
    private static final Logger logger = 
            LoggerFactory.getLogger(KafKaProducerService.class);
     
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    public void sendMessage(String message) 
    {
        logger.info(String.format("Message sent -> %s", message));
        this.kafkaTemplate.send(AppConstants.TOPIC_NAME, message);
    }
}

Similarly, KafKaConsumerService class uses @KafkaListener to receive messages from the configured topic name.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import com.howtodoinjava.kafka.demo.common.AppConstants;
 
@Service
public class KafKaConsumerService 
{
    private final Logger logger = 
            LoggerFactory.getLogger(KafKaConsumerService.class);
 
    @KafkaListener(topics = AppConstants.TOPIC_NAME, 
            groupId = AppConstants.GROUP_ID)
    public void consume(String message) 
    {
        logger.info(String.format("Message recieved -> %s", message));
    }
}
public class AppConstants
{
	public static final String TOPIC_NAME = "test";
	public static final String GROUP_ID = "group_id";
}

5. REST Controller

The controller is responsible for getting the message from the user using REST API, and handing over the message to the producer service to publish it to the Kafka topic.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.howtodoinjava.kafka.demo.service.KafKaProducerService;

@RestController
@RequestMapping(value = "/kafka")
public class KafkaProducerController
{
	private final KafKaProducerService producerService;

	@Autowired
	public KafkaProducerController(KafKaProducerService producerService)
	{
		this.producerService = producerService;
	}

	@PostMapping(value = "/publish")
	public void sendMessageToKafkaTopic(@RequestParam("message") String message)
	{
		this.producerService.sendMessage(message);
	}
}

6. Demo

Use any REST API tester and post few messages to API http://localhost:9000/kafka/publish in query parameter "message".

Message post : http://localhost:9000/kafka/publish?message=Alphabet

Observe the console logs:

2020-05-24 23:36:47.132  INFO 2092 --- [nio-9000-exec-4]
c.h.k.demo.service.KafKaProducerService  : Message sent -&gt; Alphabet

2020-05-24 23:36:47.138  INFO 2092 --- [ntainer#0-0-C-1]
c.h.k.demo.service.KafKaConsumerService  : Message recieved -&gt; Alphabet

If you have opened the Kafka console consumer already in the command prompt, you will see the message appear there as well.

Kafka console consumer
Kafka console consumer

7. Conclusion

In this spring boot kafka tutorial, we learned to create spring boot application and configure Kafka servers.

Additionally, we verified the application by posting some messages using KafkaTemplate and then consuming the messages using @KafkaListener.

Happy Learning !!

Sourcecode Download

Comments are closed for this article!

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.