Guide to RSocket using Spring Boot

Learn the basics of RSocket protocol, four possible communication models and implement those models in Spring boot using the reactive types such as Mono and Flux.

1. Introduction to RSocket

HTTP is the most traditional and used way of communication between applications over networks. HTTP works on the request-response model which involves sending a message and then waiting for a response. It doesn’t easily allow asynchronous communication or allows for an open bidirectional channel for non-stop communication using the same connection.

RSocket, a relatively new protocol over TCP for inter-application communication, allows asynchronous communication using a reactive model consistent with reactive types like Flux and Mono. It is very important to know that with RSocket, we do not use the terms like “client” and “server” because both sides become symmetrical and each side can initiate the interaction. We refer to the participating sides as “requester” and “responder”.

Spring Boot uses rsocket-java which is built on Project Reactor. And boot uses the Reactor Netty as the embedded server for the transport.

2. Communication Models

RSocket provides the following 4 distinct communication models:

  • Request-Response: model mimics typical HTTP communication where the requester issues a single request and the responder responds with a single response. The only difference is that RSocket is fundamentally nonblocking and based on reactive types.
  • Request-Stream: similar to request-response, except the responder responds with a stream of zero-to-many values in a stream.
  • Fire-and-Forget: is used when the requester sends a request to the responder but doesn’t need a response. It is one-way communication.
  • Channel: is a bidirectional communication channel where both the requester and responder can send data to each other at any time.

The type of communication channel is decided by the reactive types used in the methods. The following table should help us understand how the model is applied:

Communication ModelInput TypesOutput Types
Fire-and-ForgetMono<T>Mono<Void>
Request-ResponseMono<T>Mono<T>
Request-StreamMono<T>Flux<T>
ChannelFlux<T>Flux<T>

3. Setting up RSocket with Spring Boot

3.1. Maven

Spring supports all four communication models offered by RSocket. We need to include the latest version of spring-boot-starter-rsocket dependency to include all necessary classes, interfaces and an embedded server to deploy the responders.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>

3.2. Responder Server

If we are creating a rsocket responder application then we must define the spring.rsocket.server.port configuration property. Its value is the TCP port number where the responder will listen to incoming requests. If it is not set, Spring will assume that our application will act as a requester only, and no server port will be listening.

spring.rsocket.server.port=7000

3.3. Requester Client

If we are creating a rsocket requester application then we need to create RSocketRequester instance. Spring Boot autoconfiguration automatically creates a bean of type RSocketRequester.Builder. We can inject the builder bean into any class where we want to use RSocketRequester.

@Autowired
RSocketRequester.Builder requesterBuilder;

//Use builder to get RSocketRequester in any method
RSocketRequester rsocketRequester = requesterBuilder.tcp("localhost", 7000);

3.4. Message Handlers

Spring configuration will automatically detect @Controller beans with @MessageMapping annotation and use them as message handlers. This is same for responder and requester side, both. We can compare them with @GetMapping and @PostMapping for easy understanding.

@Controller
public class AppController {

    @MessageMapping("mapped-path")
    public Flux<ResponseType> handle(Mono<RequestType> request) {
        // ...
    }
}

The @DestinationVariable is similar to @PathVariable, and is used to extract the dynamic values from the mapped paths.

@MessageMapping("operation/{name}")
public Mono<String> operate(@DestinationVariable("name") String name) {
	...
}

On the requester side, @Payload annotation denotes the payload of the request of type Mono or Flux. @Payload is an optional annotation. We can rewrite the first example as follows:

@MessageMapping("mapped-path")
public Flux<ResponseType> handle(@Payload Mono<RequestType> request) {
    // ...
}

4. Implementing RSocket Communication Models

In the following sections, we will learn to implement the four communication models as discussed in the first section:

4.1. Request-Response

The following responder accepts a string (name) and returns a string (greeting message). It accepts a Mono and responds with a Mono.

@MessageMapping("greeting/{name}")
public Mono<String> greet(@DestinationVariable("name") String name, Mono<String> greetingMono) {

  return greetingMono
      .doOnNext(greeting ->
          log.info("Received a greeting from {} : {}", name, greeting))
      .map(greeting -> "Hello "+ name +"!");
}

On the requester side, we are using the RSocketRequester to send the message to the above responder and print the received message.

rsocketRequester
    .route("greeting/{name}", "Lokesh")
    .data("Hello there!")
    .retrieveMono(String.class)
    .subscribe(response -> log.info("RSocket response : {}", response));

The program output in the console is as follows. Note that we are running the requester and responder in the same machine.

...c.h.a.r.controller.MessageController     : Received a greeting from Lokesh : Hello there!
...c.howtodoinjava.app.requestResponse.App  : RSocket response : Hello Lokesh!

4.2. Request-Stream

The following responder accepts a string (stock name) and responds with the price change for that stock every second. It accepts a Mono and responds with a Flux.

@MessageMapping("stock/{symbol}")
public Flux<StockPrice> getStockPrice(@DestinationVariable("symbol") String symbol) {

  return Flux
      .interval(Duration.ofSeconds(1))
      .map(i -> {
        BigDecimal price = BigDecimal.valueOf(Math.random() * 10);
        return new StockPrice(symbol, price, Instant.now());
      });
}

On the requester side, we are using the RSocketRequester to send the stock name and print the changed stock price as it is received from the responder.

String stockSymbol = "TESLA";

rsocketRequester
    .route("stock/{symbol}", stockSymbol)
    .retrieveFlux(StockPrice.class)
    .doOnNext(stockPrice ->
        log.info(
            "Price of {} : {} (at {})",
            stockPrice.getSymbol(),
            stockPrice.getPrice(),
            stockPrice.getTimestamp())
    )
    .subscribe();

The program output in the console is as follows.

...c.howtodoinjava.app.requestStream.App : Price of TESLA : 7.11575778961884 (at 2022-12-15T11:39:03.987214700Z)
...c.howtodoinjava.app.requestStream.App : Price of TESLA : 8.050430827338355 (at 2022-12-15T11:39:04.991247900Z)
...c.howtodoinjava.app.requestStream.App : Price of TESLA : 4.367875149335129 (at 2022-12-15T11:39:05.993992Z)

4.3. Fire-and-Forget

The following responder accepts an Event, logs it and does not return anything. It accepts a Mono and responds with a Void.

@MessageMapping("event")
public Mono<Void> setAlert(Mono<Event> alertMono) {

  return alertMono
      .doOnNext(event ->
          log.info("Event Id '{}' occurred of type '{}' at '{}'",
              event.getId(),
              event.getType(),
              event.getTimestamp())
      )
      .thenEmpty(Mono.empty());
}

On the requester side, we are sending the event to the above responder and no further action is taken.

rsocketRequester
    .route("event")
    .data(new Event(11L, Type.ERROR, Instant.now()))
    .send()
    .subscribe();

The program output in the console is as follows.

...c.h.a.f.controller.EventController : Event Id '11' occurred of type 'ERROR' at '2022-12-15T11:43:06.425609500Z'

4.4. Channel

The following responder accepts an LoanDetails object that has details about a requested loan by the client. It checks teh loan details and responds with a Boolean value whether the loan should be granted or not. It accepts a Flux and responds with a Flux.

@MessageMapping("check-loan-eligibility")
public Flux<Boolean> calculate(Flux<LoanDetails> loanDetails) {

  return loanDetails
      .doOnNext(ld -> log.info("Calculating eligibility:  {}", ld))
      .map(ld -> {
        if(ld.getRate() > 9)
          return true;
        else
          return false;
      });
}

On the requester side, we send the requests every 2 seconds and log the response as it is received. We are using fromArray() method, but details could be a Flux created from any source of data such as a reactive data repository.

Flux<LoanDetails> lonaDetailsFlux = Flux.fromArray(new LoanDetails[]{
      new LoanDetails(100, 5, 1),
      new LoanDetails(200, 7, 1),
      new LoanDetails(300, 10, 1),
      new LoanDetails(400, 8, 1),
      new LoanDetails(600, 11, 1)
  })
  .delayElements(Duration.ofSeconds(2));

rsocketRequester
		.route("check-loan-eligibility")
		.data(lonaDetailsFlux)
		.retrieveFlux(Boolean.class)
		.subscribe(result -> log.info("Loan eligibility : {}", result));

The program output in the console is as follows.

...c.h.a.b.controller.LoanCheckController   : Calculating eligibility:  LoanDetails(amount=100.0, rate=5.0, years=1)
...c.howtodoinjava.app.bidirectional.App    : Loan eligibility : false
...c.h.a.b.controller.LoanCheckController   : Calculating eligibility:  LoanDetails(amount=200.0, rate=7.0, years=1)
...c.howtodoinjava.app.bidirectional.App    : Loan eligibility : false
...c.h.a.b.controller.LoanCheckController   : Calculating eligibility:  LoanDetails(amount=300.0, rate=10.0, years=1)
...c.howtodoinjava.app.bidirectional.App    : Loan eligibility : true

5. RSocket over WebSocket

By default, RSocket communication takes place over a TCP socket. Suppose, in our network infrastructure, using TCP ports is not allowed due to security issues. Or the requester is a strict HTTP client such as a browser. In such cases, we can not use TCP protocol and RSocket must be transported over WebSocket which uses HTTP as the primary means of communication.

On the responder side, to switch from TCP transport to WebSocket transport, we need to perform two steps:

Include spring-boot-starter-webflux dependency that supports handling HTTP requests.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

Configure the spring.rsocket.server.mapping-path property for HTTP path. Also, there is no need to set spring .rsocket.server.port property.

spring.rsocket.server.mapping-path = /rsocket

# No need to set TCP port
#spring .rsocket.server.port=7000

On the requester side, we only need to use requesterBuilder.websocket() method to create a WebSocket-based requester. Rest all things remain same.

RSocketRequester requester = requesterBuilder.websocket(URI.create("ws://localhost:8080/rsocket"));

requester
    .route("message-path")
    ....
    ....

6. Conclusion

In this Spring Boot and RSocket tutorial, we learned the basics of RSocket protocol, its four communication methods and implementing requester client and responder server for each method using the reactive types in Spring boot.

We also learned to use the same code over HTTP protocol using the WebSocket if the TCP protocol is not available to use.

Happy Learning !!

Sourcecode on Github

Comments

Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.

Our Blogs

REST API Tutorial

Dark Mode

Dark Mode