Learn to create a spring boot application which is able to connect a given Apache Kafka broker instance. Also, learn to produce and consumer messages from a Kafka topic.
Steps we will follow:
- Create Spring boot application with Kafka dependencies
- Configure kafka broker instance in
application.yaml
- Use
KafkaTemplate
to send messages to topic - Use
@KafkaListener
to listen to messages sent to topic in real time
1. Prerequisites
- Please follow this guide to setup Kafka on your machine.
- We are creating a maven based Spring boot application, so your machine should have minimum Java 8 and Maven installed.
2. Spring boot application
Open spring initializr and create spring boot application with following dependencies:
- Spring for Apache Kafka
- Spring Web

The generated project has following dependencies in pom.xml
.
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies>
Import the project in the preferred IDE.
3. Configure Kafka broker
In application.yaml
file, add kafka broker address as well as consumer and producer related configuration.
server: port: 9000 spring: kafka: consumer: bootstrap-servers: localhost:9092 group-id: group-id auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: bootstrap-servers: localhost:9092 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
4. KafKaProducerService and KafKaConsumerService
KafKaProducerService
class uses autowired KafkaTemplate
to send message to configured topic name. Similarly, KafKaConsumerService
class uses @KafkaListener
to receive messages from configured topic name.
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import com.howtodoinjava.kafka.demo.common.AppConstants; @Service public class KafKaProducerService { private static final Logger logger = LoggerFactory.getLogger(KafKaProducerService.class); @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String message) { logger.info(String.format("Message sent -> %s", message)); this.kafkaTemplate.send(AppConstants.TOPIC_NAME, message); } }
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; import com.howtodoinjava.kafka.demo.common.AppConstants; @Service public class KafKaConsumerService { private final Logger logger = LoggerFactory.getLogger(KafKaConsumerService.class); @KafkaListener(topics = AppConstants.TOPIC_NAME, groupId = AppConstants.GROUP_ID) public void consume(String message) { logger.info(String.format("Message recieved -> %s", message)); } }
public class AppConstants { public static final String TOPIC_NAME = "test"; public static final String GROUP_ID = "group_id"; }
5. Controller
The controller is responsible for getting the message from user using REST API, and hand over the message to producer service to publish it to the kafka topic.
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import com.howtodoinjava.kafka.demo.service.KafKaProducerService; @RestController @RequestMapping(value = "/kafka") public class KafkaProducerController { private final KafKaProducerService producerService; @Autowired public KafkaProducerController(KafKaProducerService producerService) { this.producerService = producerService; } @PostMapping(value = "/publish") public void sendMessageToKafkaTopic(@RequestParam("message") String message) { this.producerService.sendMessage(message); } }
6. Test
Use any REST API tester and post few messages to API http://localhost:9000/kafka/publish
in query parameter "message"
.
Message post : http://localhost:9000/kafka/publish?message=Alphabet
Observe the console logs:
2020-05-24 23:36:47.132 INFO 2092 --- [nio-9000-exec-4] c.h.k.demo.service.KafKaProducerService : Message sent -> Alphabet 2020-05-24 23:36:47.138 INFO 2092 --- [ntainer#0-0-C-1] c.h.k.demo.service.KafKaConsumerService : Message recieved -> Alphabet
If you have opened the Kafka console consumer already in command prompt, you will see the message appear there as well.

7. Conclusion
In this spring boot kafka tutorial, we learned to create spring boot application and configure Kafka servers. Additionally, we verified the application by posting some messages using KafkaTemplate
and then consuming the messages using @KafkaListener
.
Happy Learning !!