Spring Boot and Kafka Streams Example

In this Apache Kafka tutorial, we’ll learn to configure and create a Kafka Streams application using Spring Boot. We will also build a stream processing pipeline and write test cases to verify the same.

1. Introduction to Kafka Streams

The Kafka Streams library is a robust stream processing tool used to enrich data by performing various operations such as data transformation, data aggregation, and joining data from multiple Kafka topics. Kafka Streams uses functional programming techniques, like lambda expressions, mapping, filtering, and flatMap, which are commonly found in modern Java programming.

Kafka Streams support stateless and stateful operations, allowing us to maintain and update the state while processing streams. This stateful processing capability is advantageous when dealing with complex event processing or when we need to correlate and analyze data across different streams. For example, we can calculate the real-time total number of orders, total revenue, or total number of tickets sold.

Kafka Streams provides a high-level DSL (Domain-Specific Language) and a low-level Processor API. The DSL offers a more declarative and easy-to-use approach, whereas the Processor API allows for fine-grained control and allows custom logic implementation.

2. What is a Processor Topology?

The processor topology represents the structure of the stream processing application. It defines how the input data streams are read, transformed, processed, and produced. It consists of a directed acyclic graph (DAG) of processing nodes, where each node represents a processing step. The topology defines the flow of data and the relationships between different processing stages.

The processors in Kafka Streams are shown below:

Initially, Source processors receive data from one or more Kafka topics and deserialize it before forwarding them to the Stream Processor node which contains the main transformation or computation logic. Finally, Sink Processor writes the processed data into one or more output Kafka Topic or to a State Store.

3. Setting up Kafka Streams in Spring Boot Application

3.1. Maven

To use Kafka Streams with the Spring Boot application we add spring-kafka and kafka-streams dependency.

<dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
</dependency>

We also need spring-kafka-test, kafka-streams-test-utils and awaitility dependencies for writing unit and integration tests.

<dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
</dependency>
<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams-test-utils</artifactId>
</dependency>
<dependency>
        <groupId>org.awaitility</groupId>
        <artifactId>awaitility</artifactId>
</dependency>

3.2. Configuration

The @EnableKafkaStreams annotation is used to autoconfigure and enable the Kafka stream’s default components. The default config class KafkaStreamsDefaultConfiguration is automatically imported when using the EnableKafkaStreams annotation. This class registers a StreamsBuilderFactoryBean using KafkaStreamsConfiguration class.

@Configuration
@EnableKafkaStreams
public class StreamsConfig {

}

The KafkaStreamsConfiguration connects to the provided bootstrap servers specified by the spring.kafka.streams.bootstrap-servers property. We also need to specify application-id that acts as a consumer group name for the stream.

spring:
  kafka:
    streams:
      bootstrap-servers: localhost:9092
      application-id: order-streams-app

4. Kafka Streams in Action

4.1. Prerequisites

The following docker-compose.yml creates a single-node Kafka server with 1 zookeeper and 1 broker instance. The configuration also ensures that the Zookeeper server always starts before the Kafka server (broker) and stops after it.

If a more resilient Kafka setup is needed, then we can use the following docker cluster configuration using Docker Compose. We can change the configurations based on the specific requirements.

---
version: '3.6'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    platform: linux/amd64
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:latest
    platform: linux/amd64
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

4.2. Topology Design

Spring Boot offers a lightweight abstraction over the Streams API and manages the lifecycle of our KStream instance. It handles the creation and configuration of the necessary components for the topology and executes our Streams application. This allows us to concentrate on our primary business logic, while Spring effectively handles the lifecycle management.

Now let’s build the topology to count the total number of orders sold for a given product.

In our example, we’ve used the high-level DSL to define the transformations:

  • At first, we create a KStream from the input topic using the specified key and value SerDes (Apache Kafka provides a Serde interface, which is a wrapper for the serializer and deserializer of a data type. Kafka provides an implementation for several common data types such as String, Long, Json etc. In our code, we have used String and Json Serdes for key and Value respectively).
  • We used selectKey(..) to set a new key for each input record. Then using print(..) and Printed(..) method print the KStream data in the logs with the specified label.
  • Then, we create a KTable by transforming, grouping, and then counting the data.
  • Finally, we produce the result to an output stream (Kafka Topic).
@Component
@Slf4j
public class OrdersTopology {

	@Autowired
	public void process(StreamsBuilder streamsBuilder) {

	  KStream<String, Order> orderStreams = streamsBuilder
	    .stream(ORDERS,
	            Consumed.with(Serdes.String(), new JsonSerde<>(Order.class))
	    )
	    .selectKey((key, value) -> value.productId());

	  orderStreams
	    .print(Printed.<String, Order>toSysOut().withLabel("orders"));

	  KTable<String, Long> ordersCount =  ordersStream
	    .map((key, order) -> KeyValue.pair(order.productId(), order))
	    .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(Order.class)))
	    .count(Named.as(OrdersTopology.ORDERS_COUNT));

	  ordersCount
	    .toStream()
	    .print(Printed.<String,Long>toSysOut().withLabel(OrdersTopology.ORDERS_COUNT));

	  ordersCount
	    .toStream()
	    .to(ORDERS_OUTPUT, Produced.with(Serdes.String(), Serdes.Long()));
	}

	//...
}

Here, we’ve annotated the method with @Autowired annotation to inject a matching bean from the container into the StreamsBuilder parameter.

4.3. State Store

Optionally, Instead of publishing the data to an output Kafka topic, we can materialize the result into a StateStore which allows us to query and fetch data more effectively instead of writing a consumer code for the output topic. We can either provide a custom StateStore or use the default RocksDB by providing just a store name. 

RocksDB is an embedded state store meaning writes do not entail a network call. This removes latency and ensures it does not become a bottleneck in the stream processing, which means this is not a potential failure point that would need to be catered for. It is a key/value store, with records persisted as their byte representation resulting in fast reads and writes. It is a persistent rather than in-memory state store with writes being flushed to disk asynchronously.

In our case, we will use RocksDB to store the result. So let’s modify the KTable from earlier and materialize the aggregated order count as a local state store.

@Autowired
public void process(StreamsBuilder streamsBuilder) {

         //...

        KTable<String, Long> ordersCount =  ordersStream
                .map((key, order) -> KeyValue.pair(order.productId(), order))
                .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(Order.class)))
                .count(Named.as(OrdersTopology.ORDERS_COUNT),
                  
                        Materialized.as(OrdersTopology.ORDERS_COUNT)
                );

       //...
}

Using StateStore, allows us to write REST API to fetch the data as shown below easily:

@Autowired
 private StreamsBuilderFactoryBean streamsBuilderFactoryBean;

@GetMapping("/count")
public List<OrdersCountPerStoreDTO> ordersCount() {

        ReadOnlyKeyValueStore<String, Long> ordersStoreData = streamsBuilderFactoryBean.getKafkaStreams()
                .store(StoreQueryParameters.fromNameAndType(
                        ORDERS_COUNT,
                        QueryableStoreTypes.keyValueStore()
                ));

        var orders = ordersStoreData.all();

        var spliterator = Spliterators.spliteratorUnknownSize(orders, 0);

        return StreamSupport.stream(spliterator, false)
                .map(data -> new OrdersCountPerStoreDTO(data.key, data.value))
                .collect(Collectors.toList());
}

In the controller, we begin by autowiring the StreamsBuilderFactoryBean. This grants us access to the KafkaStreams instance that is managed by the factory bean. Subsequently, we utilize KafkaStreams to fetch the results from the ORDERS_COUNT key/value state store, which is represented by a KTable.

5. Testing

5.1. Unit Test

Let’s write a unit test for our topology using the TopologyTestDriver. This class makes it easier to write tests to verify the behavior of topologies created with StreamsBuilder

At first, we instantiate StreamsBuilder, then we use it to create a bean of OrdersTopology that has our business logic. Then, using the TopologyTestDriver we simply instantiate the driver by providing a Topology (StreamsBuilder.build()). Then, we use the driver to create an input topic specifically for testing purposes.

class OrdersTopologyTest {

    TopologyTestDriver topologyTestDriver = null;
    TestInputTopic<String, Order> ordersInputTopic = null;

    static String INPUT_TOPIC = ORDERS;
    StreamsBuilder streamsBuilder;

    OrdersTopology ordersTopology = new OrdersTopology();

    @BeforeEach
    void setUp() {
        streamsBuilder = new StreamsBuilder();
        ordersTopology.process(streamsBuilder);
        topologyTestDriver = new TopologyTestDriver(streamsBuilder.build());

        ordersInputTopic =
                topologyTestDriver.
                        createInputTopic(
                                INPUT_TOPIC, Serdes.String().serializer(),
                                new JsonSerde<Order>(Order.class).serializer());


    }

    @AfterEach
    void tearDown() {
        topologyTestDriver.close();
    }

    @Test
    void ordersCount() {

        ordersInputTopic.pipeKeyValueList(orders());

        ReadOnlyKeyValueStore<String, Long> OrdersCountStore = topologyTestDriver.getKeyValueStore(ORDERS_COUNT);

        var product1234OrdersCount = OrdersCountStore.get("product_1234");
        assertEquals(2, product1234OrdersCount);

        var product4567OrdersCount = OrdersCountStore.get("product_4567");
        assertEquals(2, product4567OrdersCount);

    }

    static List<KeyValue<String, Order>> orders(){

          // create some orders
    }

}

5.2. Integration Test

We use EmbeddedKafka and TestPropertySource to set up the environment for the integration test. Additionally, we also use Awaitility for making it easier to test asynchronous code. 

@SpringBootTest
@EmbeddedKafka(topics = {ORDERS})
@TestPropertySource(properties = {
        "spring.kafka.streams.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}"
 ,"spring.kafka.streams.auto-startup=false"
})
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class OrdersTopologyIntegrationTest {

    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    StreamsBuilderFactoryBean streamsBuilderFactoryBean;
    @Autowired
    ObjectMapper objectMapper;

    @Autowired
    OrderService orderService;

    @BeforeEach
    public void setUp() {
        streamsBuilderFactoryBean.start();
    }

    @AfterEach
    public void destroy() {
        streamsBuilderFactoryBean.getKafkaStreams().close();
        streamsBuilderFactoryBean.getKafkaStreams().cleanUp();
    }

    @Test
    void ordersCount() {
        publishOrders();

        Awaitility.await().atMost(10, SECONDS)
                .pollDelay(Duration.ofSeconds(1))
                .ignoreExceptions()
                .until(() -> orderService.ordersCount().size(), equalTo(1));

        var ordersCount = orderService.ordersCount();
        assertEquals(1, ordersCount.get(0).count());

    }

    private void publishOrders() {
        orders()
                .forEach(order -> {
                    String orderJSON = null;
                    try {
                        orderJSON = objectMapper.writeValueAsString(order.value);
                    } catch (JsonProcessingException e) {
                        throw new RuntimeException(e);
                    }
                    kafkaTemplate.send(ORDERS, order.key, orderJSON);
                });
    }


    static List<KeyValue<String, Order>> orders() {


          // create some orders
    }


}

6. Conclusion

In conclusion, we learned about the key concepts of Kafka Streams, such as processor topology and the different types of processors involved in the stream processing pipeline. The article also provided a step-by-step guide on setting up Kafka Streams in a Spring Boot application, including the necessary dependencies and configuration.

Happy Learning !!

Source Code on Github

Comments

Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.

Our Blogs

REST API Tutorial

Dark Mode

Dark Mode