Stream Processing with Redis and Spring Boot Data

In this Spring data tutorial, we will learn how to use Redis Streams with Spring Boot to implement real-time stream processing. As a prerequisite, you must have a basic knowledge of Redis and Spring Boot integration.

1. What is a Stream?

A stream is a data structure that acts like an append-only log. We use streams to record and simultaneously process the application events in real-time. Each entry in the stream has a unique ID and a value. By default, the ID is automatically generated and includes a timestamp, while the value is a hash.

Redis streams are ideal for building history-preserving message brokers, message queues, unified logs, and chat systems. Unlike Pub/Sub messages which are fire and forget, Redis streams provide persistence. Redis streams implement consumer groups, which means, only one consumer from a consumer group will receive the message so no duplicate message processing.

2. Why Redis Streams?

In the microservices era, we get continuous or uninterrupted data streams. The traditional practice was collecting data and performing batch processing at fixed intervals. Such processing introduces a delay in the processing. Delaying the processing of such data can have significant repercussions for businesses. For instance, consider applications like Netflix or YouTube, where immediate recommendations based on users’ browsing activity enhance the user experience and contribute to business success.

Another example where stream plays a critical role, a Bank might want to check if there is any fraudulent activity and block the card immediately if found. Credit card providers cannot afford to delay such crucial actions until the nightly processing routine.

In such time-critical needs, we must process the events in real-time as and when they occur. Redis Streams offers in-memory and simple stream processing, specially when Redis is already being used in the application. Note that while the Redis offers features like stream creation, appending and consuming events, and acknowledging message delivery. However, it lacks advanced features such as built-in windowing, event time processing, or complex event processing.

It is advisable to use dedicated stream processing frameworks like Apache Kafka or Flink to handle large-scale data processing that can support high throughput, fault tolerance, and scalability.

3. Redis Installation

For demonstration purposes, we are installing Redis locally. Find more information about Linux and MacOS downloads here. Please note that Redis does not officially support Windows, but we can find a port of the server here.

To install Redis as a Docker container, run the following command:

$ docker run --name my-redis -p 6379:6379  -d redis

4. Demo Overview

We are going to create 2 simple services:

  • stream-event-producer: publishes the purchase events to a key ‘purchase-events‘.
  • stream-event-consumer: is notified when new events are published. There could be an N number of subscribers, but only one consumer in the consumer group will receive the message. In our case, we will have 1 consumer.

The following PurchaseEvent class contains the message fields sent for each message. Fields can vary based on requirements.

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class PurchaseEvent {

    private String purchaseId;
    private String productId;
    private String productName;
    private int price;
    private int quantity;

}

4. Setting Up Redis with Spring Boot

Start with adding spring-boot-starter-data-redis dependency which includes the Lettuce, by default.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

Next, we will set up a stream key to which the producer will send messages, and the consumer will receive from. We add the key in properties files.

stream.key=purchase-events

5. Message Producer

The RedisTemplate provides a simple API for the passage of arbitrary objects as messages. In the following example, the producer uses the opsForStream() method to publish a PurchaseEvent to the configured stream key.

@Service
@Slf4j
public class PurchaseStreamProducer {

  @Autowired
  private RedisTemplate<String, String> redisTemplate;

  @Value("${stream.key:purchase-events}")
  private String streamKey;

  public RecordId produce(PurchaseEvent purchaseEvent) throws JsonProcessingException {

    ObjectRecord<String, PurchaseEvent> record = StreamRecords.newRecord()
        .ofObject(purchaseEvent)
        .withStreamKey(streamKey);

    RecordId recordId = this.redisTemplate.opsForStream()
        .add(record);

    if (Objects.isNull(recordId)) {
      log.info("error sending event: {}", purchaseEvent);
      return null;
    }

    return recordId;
  }
}
  • StreamRecords – Obtain a new instance of StreamRecords.RecordBuilder to create records fluently.
  • ObjectRecord<K, V> – Create a new instance of Record with the given key to store the record at.
  • RecordId – The id of a single Record within a stream. Composed of two parts: <millisecondsTime>-<sequenceNumber>.

6. Message Consumer

We can consume messages from streams using 2 approaches:

  • Synchronous
  • Asynchronous
  • In synchronous approach, the subscription commands are blocking. It means, calling StreamOperations.read(…) on a connection causes the current thread to block as it starts waiting for messages. The thread is released only if the read command times out or receives a message.
  • In asynchronous approach, we will use MessageListenerContainer to consume stream messages. Each time a new record arrives, the container notifies the StreamListener.

Asynchronous message consumer can be configured in the following steps:

6.1. Create a Consumer Group

The below code creates a consumer group in Redis if there doesn’t exist one. If a consumer group exists, then it simply logs the exception and proceeds.

@Value("${stream.key:purchase-stream-events}")
private String streamKey;

private void createConsumerGroupIfNotExists(RedisConnectionFactory redisConnectionFactory, 
    String streamKey, String groupName){
       
         try {
            try {
                redisConnectionFactory.getConnection().streamCommands()
                        .xGroupCreate(streamKey.getBytes(), streamKey, ReadOffset.from("0-0"), true);
            } catch (RedisSystemException exception) {
                log.warn(exception.getCause().getMessage());
            }
        }
        catch (RedisSystemException ex){
            log.error(ex.getMessage());
        }
}

6.2. MessageListenerContainer

StreamMessageListenerContainer subscribes to a Redis Stream, consumes incoming messages and sends them to the registered StreamListener instances for processing. It allows the handling of multiple requests and returns a Subscription handle per read request. Cancelling the Subscription terminates eventual background polling. Messages are converted using key and value serializers described in RedisTemplate.

Furthermore, the container allows dynamic configuration changes during runtime, enabling the addition or removal of listeners without requiring a restart. Additionally, the container adopts a lazy subscription approach, utilizing the RedisConnection only when necessary. When all the listeners are unsubscribed, automatic cleanup is performed, and the thread is released.

@Value("${stream.key:purchase-stream-events}")
private String streamKey;

@Bean
public Subscription subscription(RedisConnectionFactory connectionFactory) throws UnknownHostException {

        createConsumerGroupIfNotExists(connectionFactory, streamKey, streamKey);

        StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String,
                ObjectRecord<String, PurchaseEvent>> options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofMillis(100))
                .targetType(PurchaseEvent.class)
                .build();

        StreamMessageListenerContainer<String, ObjectRecord<String, PurchaseEvent>>  container =
                StreamMessageListenerContainer
                .create(connectionFactory, options);

        Subscription subscription =
                container.receive(Consumer.from(streamKey, InetAddress.getLocalHost().getHostName()),
                streamOffset, purchaseStreamListener());

        container.start();
        return subscription;
}

@Bean
public StreamListener<String, ObjectRecord<String, PurchaseEvent>> purchaseStreamListener() {
  // handle message from stream
  return new PurchaseStreamListener();
}

//...
  • StreamOffset – Create a StreamOffset given key and ReadOffset.
  • ReadOffset – From where to start reading the messages, possible values:
    • latest() – Read from the latest offset.
    • lastConsumed() – Read all new arriving elements with ids greater than the last one consumed by the consumer group.
    • from(String offset) – Read all arriving elements from the stream starting at offset.
    • from(RecordId offset) – Read all arriving elements from the stream starting at RecordId.

Performing read operations based on a specific message ID and the last consumed message guarantees the consumption of all messages added to the stream. However, using the latest() to read can skip messages added to the stream while the poll operation was in the state of dead time.

6.3. StreamListener

The callback is triggered whenever a new message is received, and our code is executed through the onMessage() method.

@Slf4j
@Service
public class PurchaseStreamListener implements StreamListener<String, ObjectRecord<String, PurchaseEvent>> {

    @Value("${stream.key:purchase-events}")
    private String streamKey;

    @Autowired
    private ObjectMapper objectMapper;

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Override
    @SneakyThrows
    public void onMessage(ObjectRecord<String, PurchaseEvent> record) {

        PurchaseEvent purchaseEvent = record.getValue();
        
        redisTemplate.opsForValue().set(purchaseEvent.getPurchaseId(),
                objectMapper.writeValueAsString(purchaseEvent));

        redisTemplate.opsForStream().acknowledge(streamKey, record);
    }
}

7. Acknowledging the Message Delivery

When reading messages via Consumer Group, the server remembers the given message delivered and adds it to the Pending Entries List (PEL). Messages acknowledged via RedisTemplate.opsForStream.acknowledge() method are removed from the pending messages list.

Non-acknowledged messages can keep on adding memory overhead, and thus impact performance.

@Override
@SneakyThrows
public void onMessage(ObjectRecord<String, PurchaseEvent> record) {
    ...
    ...
    redisTemplate.opsForStream().acknowledge(streamKey, record);
}

To auto acknowledge messages on receive, use receiveAutoAck() instead of receive() in StreamMessageListenerContainer configuration.

8. Demo

To demo the stream processing using Redis, we will write the integration tests using TestContainers. The TestContainers library helps us run module-specific Docker containers to simplify the integration testing.

We are using @SpyBean to spy the PurchaseEventListener instance in the application context so we can verify the message listeners’ onMessage() method is called.

@SpringBootTest
@Testcontainers(disabledWithoutDocker = true)
class RedisStreamEventConsumerTest {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    private ObjectMapper objectMapper;

    @SpyBean(name = "purchaseStreamListener")
    StreamListener<String, ObjectRecord<String, PurchaseEvent>> purchaseStreamListener;

    @Container
    private static final RedisContainer REDIS_CONTAINER =
            new RedisContainer(DockerImageName.parse("redis:5.0.3-alpine")).withExposedPorts(6379);

    @DynamicPropertySource
    private static void registerRedisProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.data.redis.host", REDIS_CONTAINER::getHost);
        registry.add("spring.data.redis.port", () -> REDIS_CONTAINER
                .getMappedPort(6379).toString());
    }

    @Test
    public void testOnMessage() throws Exception {

        PurchaseEvent purchaseEvent = PurchaseEvent.builder()
                .purchaseId("1")
                .productId("12")
                .productName("IPhone 14")
                .quantity(1)
                .price(74000)
                .build();

        String streamKey = "purchase-events";
        ObjectRecord<String, PurchaseEvent> record = StreamRecords.newRecord()
                .ofObject(purchaseEvent)
                .withStreamKey(streamKey);

        this.redisTemplate.opsForStream()
                .add(record);


        CountDownLatch latch = new CountDownLatch(1);
        latch.await(3, TimeUnit.SECONDS);

        verify(purchaseStreamListener, times(1))
                .onMessage(isA(ObjectRecord.class));

        PurchaseEvent receivedEvent = objectMapper.readValue(
                redisTemplate.opsForValue().get(purchaseEvent.getPurchaseId()),
                PurchaseEvent.class);

        assertEquals(receivedEvent.getPurchaseId(), purchaseEvent.getPurchaseId());
        assertEquals(receivedEvent.getProductId(), purchaseEvent.getProductId());
        assertEquals(receivedEvent.getQuantity(), purchaseEvent.getQuantity());
        assertEquals(receivedEvent.getPrice(), purchaseEvent.getPrice());

        redisTemplate.expire(purchaseEvent.getPurchaseId(), Duration.ofSeconds(2));
    }

}

We execute the test case to verify that it runs successfully.

9. Conclusion

Redis offers in-memory stream processing capabilities for simple usecases. It is not meant for enterprise-class stream collection and processing requirements. In Redis is already present in the application for in-memory cache optimizations, it makes a lot of sense to use the stream processing without adding new dependencies.

For advanced usages, the recommended solutions are dedicated stream processing frameworks such as Kafka, Flink and Spring cloud streams.

Sourcecode on Github

Comments

Subscribe
Notify of
guest
3 Comments
Most Voted
Newest Oldest
Inline Feedbacks
View all comments

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.

Our Blogs

REST API Tutorial

Dark Mode

Dark Mode