Redis Pub/Sub with Spring Boot Data

In this Spring data tutorial, we will learn how to use Redis Pub/Sub to broadcast messages across multiple services in a Microservices architecture. As a prerequisite, you are expected to have a basic knowledge of Redis and Spring Boot integration.

1. When to Use Redis Publish/Subscribe?

A pub/sub design operates on an asynchronous messaging model for service-to-service communication. Unlike direct messaging to individual recipients, Pub/Sub allows services to broadcast messages (publishers) to a designated topic/channel, enabling interested services to receive and consume the messages (subscribers).

It’s important to note that Redis pub/sub is a simple messaging system and it lacks some advanced features. If we require more advanced messaging features such as message persistence, guaranteed delivery, or complex routing, then we should use dedicated message brokers like RabbitMQ, Apache Kafka, or AWS Simple Notification Service (SNS).

2. Installing Redis

For demo purposes, we are installing a Redis locally. Find more information about Linux and MacOS downloads here. Please note that Redis does not officially support Windows, but we can find a port of the server here.

To install Redis as a Docker container, run the following command:

$ docker run --name my-redis -p 6379:6379  -d redis

3. The Pub/Sub Services used for Demo

We are going to create 2 simple services:

  • order-events-publisher: publishes the sales events to a topic ‘order-events‘.
  • order-events-subscriber:  will be notified as and when new events are published. There could be an N number of subscribers. In our case, we will have 1 subscriber.

The following OrderEvent class contains the message fields sent for each message. Fields can vary based on requirements.

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class OrderEvent implements Serializable {

    private int orderId;
    private int userId;
    private String productName;
    private int price;
    private int quantity;
} 

4. Setting Up Redis with Spring Boot

Start with adding spring-boot-starter-data-redis dependency which includes the Lettuce, by default.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

For Spring applications, RedisTemplate is a more familiar approach due to the template pattern used in other classes such as JdbcTemplate or RestTemplate. It provides a higher-level abstraction for interacting with Redis.

@Configuration
public class RedisConfiguration {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {

        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(connectionFactory);
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        return template;
    }
}

Next, we’ll set up a topic (default name configured to ‘channel-events’) to which the publisher will send messages, and the subscriber will receive them.

@Configuration
public class RedisConfiguration {

  @Value("${redis.pubsub.topic:channel-events}")
  private String topic;

  @Bean
  public ChannelTopic channelTopic() {
      return new ChannelTopic(topic);
  }

  //...
}

5. Message Publisher

The RedisTemplate provides a simple API for the passage of arbitrary objects as messages. In the following example, the publisher uses the convertAndSend() method to format and publish an OrderEvent to the configured channel topic.

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private ChannelTopic channelTopic;

@Override
public Long publish(OrderEvent orderEvent){

    log.info("Sending message Sync: {}", orderEvents);
    return redisTemplate.convertAndSend(channelTopic.getTopic(), orderEvent);
}

The listener topic can be either a channel (for example, topic="order-events") or a pattern (for example, topic="*-events")

6. Message Subscriber

A message subscriber needs to configure the subscription in 3 steps:

6.1. RedisMessageListenerContainer

To enable asynchronous behavior for Redis message listeners, we start by creating a bean of the RedisMessageListenerContainer class provided by Spring Data Redis. This container serves as a message listener container, facilitating the reception of messages from a Redis channel and the management of injected MessageListener instances. The container takes care of thread handling for message reception and dispatches messages to the corresponding listeners for processing.

With RedisMessageListenerContainer, multiple listeners can share a single connection and thread, even if they don’t share a subscription. This feature ensures that the runtime cost remains consistent throughout the application’s lifespan, regardless of the number of listeners or channels it tracks.

Furthermore, the container allows dynamic configuration changes during runtime, enabling the addition or removal of listeners without requiring a restart. Additionally, the container adopts a lazy subscription approach, utilizing the RedisConnection only when necessary. When all the listeners are unsubscribed, automatic cleanup is performed, and the thread is released.

To support the asynchronous nature of messages, the container relies on java.util.concurrent.Executor (or Spring’s TaskExecutor) for message dispatching.

private final RedisConnectionFactory connectionFactory;

@Bean
public RedisMessageListenerContainer redisContainer(ChannelTopic topic) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.addMessageListener(messageListener(), topic());
    return container;
}

6.2. MessageListenerAdapter

Next, we’ll define a MessageListenerAdapter bean which contains a custom implementation of the MessageListener interface called RedisMessageSubscriber. This bean acts as a subscriber in the pub-sub messaging model.

@Bean
public MessageListenerAdapter messageListener() {
    return new MessageListenerAdapter(new OrderEventListener());
}

The OrderEventListener is a simple service that implements the MessageListener.onMessage() method and handle the messages publisher by the publisher service.

6.3. MessageListener

Whenever a new message is received, the callback is triggered, and our code is executed through the onMessage method.

  • The message parameter contains the actual message that is published by the publisher.
  • The byte[] pattern contains information related to the channel it has been received through and the pattern (if any) used by the subscription to match the channel.
@Service
@AllArgsConstructor
public class OrderEventListener implements MessageListener {

    private final ObjectMapper objectMapper;

    @Override
    public void onMessage(Message message, byte[] pattern) {

        try {
            log.info("New message received: {}", message);
            OrderEvent orderEvent = objectMapper.readValue(message.getBody(), OrderEvent.class);
            //Use the order event as necessary
        } catch (IOException e) {
            log.error("Error while parsing message");
        }
    }
}

7. Demo

To demo the publisher-subscriber communication, we will write the integration tests using TestContainers. TestContainers library helps us run module-specific Docker containers to simplify the integration testing.

We are using @MockBean to inject a mocked OrderEventListener instance in the application context so we can verify the message listener and message body.

@SpringBootTest
@Testcontainers(disabledWithoutDocker = true)
class MainTest {

  @Autowired
  private RedisTemplate<String, Object> redisTemplate;

  @MockBean
  OrderEventListener orderEventListener;

  @Autowired
  private ObjectMapper objectMapper;

  @Container
  static RedisContainer REDIS_CONTAINER =
      new RedisContainer(DockerImageName.parse("redis:5.0.3-alpine")).withExposedPorts(6379);

  @DynamicPropertySource
  private static void registerRedisProperties(DynamicPropertyRegistry registry) {
    registry.add("spring.data.redis.host", REDIS_CONTAINER::getHost);
    registry.add("spring.data.redis.port", () -> REDIS_CONTAINER.getMappedPort(6379).toString());
  }

  @Test
  public void testOnMessage() throws Exception {

    OrderEvent orderEvent = OrderEvent.builder()
        .orderId("1")
        .userId("12")
        .productName("Mobile")
        .quantity(1)
        .price(42000)
        .build();

    redisTemplate.convertAndSend("order-events", orderEvent);

    Thread.sleep(1000);

    ArgumentCaptor<Message> argumentCaptor = ArgumentCaptor.forClass(Message.class);
    Mockito.verify(orderEventListener).onMessage(argumentCaptor.capture(), ArgumentMatchers.any());

    OrderEvent receivedEvent = objectMapper.readValue(argumentCaptor.getValue().getBody(),
        OrderEvent.class);

    assertEquals(receivedEvent.getOrderId(), orderEvent.getOrderId());
    assertEquals(receivedEvent.getQuantity(), orderEvent.getQuantity());
    assertEquals(receivedEvent.getPrice(), orderEvent.getPrice());
  }
}

Once, we verify that the test cases run successfully as shown below:

8. Conclusion

By developing the above two simple microservices, we showcased the utilization of Redis Pub/Sub in a Spring Boot application/microservice. The publisher and subscriber seamlessly communicated with each other through the Redis Pub/Sub feature without being tightly coupled. To learn more check out the official documentation.

Happy Learning !!

Sourcecode on Github

Comments

Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.