HowToDoInJava

  • Python
  • Java
  • Spring Boot
  • Dark Mode
Home / Spring Boot 2 / Spring boot async controller with SseEmitter

Spring boot async controller with SseEmitter

Learn to write spring boot async rest controller using SseEmitter which is a specialization of ResponseBodyEmitter for sending Server-Sent Events.

1. SseEmitter class

The SseEmitter can deliver events from the server to the client. Server-Sent-Events are messages from the server to the client. They have a Content-Type header of text/event-stream.

The events are pretty simple and have only four fields.

FieldDescription
idThe ID of the event
eventthe type of event
dataThe event data
retryReconnection time for the event stream

2. How to use SseEmitter

To send events from a request handling method, you need to create an instance of SseEmitter and return it from the request handling method. Then use the emitter.send() method to send individual elements to the client.

@RequestMapping(value="/resource-uri", method=RequestMethod.GET)
public SseEmitter handle() 
{
     SseEmitter emitter = new SseEmitter();

     // Pass the emitter to another component...
     return emitter;
}

// in another thread
 emitter.send(foo1);

 // and again
 emitter.send(foo2);

 // and done
 emitter.complete();

If you want to add more info to the event, use the SseEventBuilder . The event() factory-method of the SseEmitter creates an instance. Use it to fill the id and event fields.

SseEventBuilder eventBuilder = SseEmitter.event();

emitter.send(
                  eventBuilder
                  .data(dataSet)
                  .name("dataSet-created")
                  .id(String.valueOf(dataSet.hashCode()))
            );

3. Async controller example using SseEmitter

In given controller method, we are accessing the data sets (use your own domain datatypes).

  • There is data service which return datasets from DB or any other source.
  • Each dataset is then processed (e.g. retrieve related information from other source) which takes time. This is simulated using an artificial delay by calling thread.sleep() method.
  • Each data set is then added to SseEmitter object using emitter.send() method.
  • Finally emitter.complete() is called to mark that request processing is complete so that the thread responsible for sending the response can complete the request and be freed up for the next response to handle.
  • If any error is encountered while request processing, complete the process by emitter.completeWithError(). The exception will pass through the normal exception handling of Spring MVC and after that the response is completed.
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import com.howtodoinjava.springasyncexample.web.model.DataSet;
import com.howtodoinjava.springasyncexample.web.service.DataSetService;

@RestController
public class DataSetController {

      private final DataSetService dataSetService;

      public DataSetController(DataSetService dataSetService) {
            this.dataSetService = dataSetService;
      }

      @GetMapping("/emit-data-sets")
      public SseEmitter fetchData2() 
      {
            SseEmitter emitter = new SseEmitter();

            ExecutorService executor = Executors.newSingleThreadExecutor();

            executor.execute(() -> 
            {
                  List<DataSet> dataSets = dataSetService.findAll();
                  try {
                        for (DataSet dataSet : dataSets) {

                              randomDelay();
                              emitter.send(dataSet);
                        }

                        emitter.complete();

                  } catch (IOException e) {
                        emitter.completeWithError(e);
                  }
            });
            executor.shutdown();
            return emitter;
      }

      private void randomDelay() {
            try {
                  Thread.sleep(1000);
            } catch (InterruptedException e) {
                  Thread.currentThread().interrupt();
            }
      }
}

4. How to test SseEmitter response

4.1. Mock testing with JUnit

To test the above controller method, I am using mockito shipped with spring boot distribution.

import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.asyncDispatch;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.request;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

import java.math.BigInteger;
import java.util.Arrays;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.test.web.servlet.result.MockMvcResultHandlers;

import com.howtodoinjava.springasyncexample.web.controller.DataSetController;
import com.howtodoinjava.springasyncexample.web.model.DataSet;
import com.howtodoinjava.springasyncexample.web.service.DataSetService;

@RunWith(SpringRunner.class)
@WebMvcTest(DataSetController.class)
public class DataSetControllerTest 
{
      @Autowired
      private MockMvc mockMvc;
      
      @MockBean
      private DataSetService dataSetService;

      @Test
      public void foo() throws Exception 
      {
            Mockito.when(dataSetService.findAll())
                        .thenReturn(Arrays.asList(new DataSet(BigInteger.valueOf(1), "data")));
            
            MvcResult mvcResult = mockMvc.perform(get("/emit-data-sets"))
                                                            .andExpect(request().asyncStarted())
                                                            .andDo(MockMvcResultHandlers.log())
                                                            .andReturn();
            
            mockMvc.perform(asyncDispatch(mvcResult))
                  .andDo(MockMvcResultHandlers.log())
                  .andExpect(status().isOk())
                  .andExpect(content().contentType("text/event-stream;charset=UTF-8"));
            
            String event = mvcResult.getResponse().getContentAsString();
            event = event.replaceAll("data:", "");
            event = event.replaceAll("\\n", "");
            
            new JsonPathExpectationsHelper("$.id").assertValue(event, "1");
            new JsonPathExpectationsHelper("$.name").assertValue(event, "data");
      }
}

Program output.

MockHttpServletResponse:
           Status = 200
    Error message = null
          Headers = {Content-Type=}
     Content type = text/event-stream;charset=UTF-8
             Body = data:{"id":1,"name":"data"}


    Forwarded URL = null
   Redirected URL = null
          Cookies = []

4.2. Browser testing

To test in browser, start the application using class SpringAsyncExampleApplication and hit the URL in browser: http://localhost:8080/emit-data-sets

Check the response returned from server coming in form of events and delay between events.

SseEmitter Example
SseEmitter Example

Notice the Content-Type header has a value of text/event-stream to indicate that we get a stream of events. The stream can be kept open and receiving event notifications. Each object written is converted to JSON with an HttpMessageConverter . Each object is written in the data tag as the event data.

5. Async configuration options

To override the default async behavior such as thread pool and timeout, you can implement the WebMvcConfigurer interface and override it’s configureAsyncSupport() method.

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@SpringBootApplication
public class SpringAsyncExampleApplication implements WebMvcConfigurer {

      public static void main(String[] args) {
            SpringApplication.run(SpringAsyncExampleApplication.class, args);
      }

      @Override
      public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
            configurer.setTaskExecutor(mvcTaskExecutor());
            configurer.setDefaultTimeout(30_000);
      }

      @Bean
      public ThreadPoolTaskExecutor mvcTaskExecutor() {
            ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
            taskExecutor.setThreadNamePrefix("mvc-task-");
            return taskExecutor;
      }
}

6. Sourcecode files

6.1. DataSetService.java

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import javax.annotation.PostConstruct;

import org.springframework.stereotype.Service;

import com.howtodoinjava.springasyncexample.web.model.DataSet;

@Service
public class DataSetService 
{
      private final List<DataSet> datasetList = new ArrayList<>();

      @PostConstruct
      public void setup() {
            createDataSets();
      }

      public List<DataSet> findAll() {
            return Collections.unmodifiableList(datasetList);
      }

      private Iterable<DataSet> createDataSets() 
      {
            String name = "dummy text_";
            
            for (int i = 0; i < 5; i++) {
                  this.datasetList.add( new DataSet(BigInteger.valueOf(i), name + i) );
            }
            return datasetList;
      }
}

6.2. DataSet.java

import java.math.BigInteger;

public class DataSet 
{
      private BigInteger id;
      private String name;
      
      public DataSet(BigInteger id, String name) {
            this.id = id;
            this.name = name;
      }

      //Getters and setters

      @Override
      public String toString() {
            return "DataSet [id=" + id + ", name=" + name + "]";
      }
}

6.3. application.properties

Enable debug logging here to understand the behavior of the application.

logging.level.org.springframework=DEBUG
logging.level.com.howtodoinjava=DEBUG

6.4. pom.xml

The used pom.xml is:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      
      <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.2.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
      </parent>
      
      <groupId>com.howtodoinjava.demo</groupId>
      <artifactId>spring-async-demo</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <name>spring-async-demo</name>
      <description>Demo project for Spring Boot</description>

      <properties>
            <java.version>1.8</java.version>
      </properties>

      <dependencies>
      
            <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            
            <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-webflux</artifactId>
            </dependency>

            <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-test</artifactId>
                  <scope>test</scope>
            </dependency>
            <dependency>
                  <groupId>io.projectreactor</groupId>
                  <artifactId>reactor-test</artifactId>
                  <scope>test</scope>
            </dependency>
      </dependencies>

      <build>
            <plugins>
                  <plugin>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-maven-plugin</artifactId>
                  </plugin>
            </plugins>
      </build>
      
      <repositories>
        <repository>
            <id>repository.spring.release</id>
            <name>Spring GA Repository</name>
            <url>http://repo.spring.io/release</url>
        </repository>
    </repositories>

</project>

Let me know if you face any error while executing this async rest controller example using SseEmitter.

Happy Learning !!

Download Sourcecode

Was this post helpful?

Let us know if you liked the post. That’s the only way we can improve.
TwitterFacebookLinkedInRedditPocket

About Lokesh Gupta

A family guy with fun loving nature. Love computers, programming and solving everyday problems. Find me on Facebook and Twitter.

Feedback, Discussion and Comments

  1. Akilan

    March 26, 2020

    I can’t able to see, emitted message in chrome -> developer tools -> Event stream tab

  2. Marcelo

    December 17, 2019

    Hello. Since your application only works by accessing the URL as localhost. If I try to access from another machine does not behave the same way. do you have any idea?

    • Lokesh Gupta

      December 17, 2019

      No idea how you are accessing it, so not able to answer.

Comments are closed on this article!

Search Tutorials

Spring Boot REST

  • SB REST – Hello World
  • SB REST – @RestController
  • SB REST – JSON
  • SB REST – POST with Headers
  • SB REST – HATEOAS
  • SB REST – @Async
  • SB REST – SseEmitter
  • SB REST – ResponseBodyEmitter
  • SB REST – Callable
  • SB REST – Unit Testing
  • SB REST – Gzip Compression
  • SB REST – i18n

Spring Boot 2 Tutorial

  • Spring Boot – Introduction
  • Spring Boot – Starter parent
  • Spring Boot – Starter templates
  • Spring Boot – Multi-module project
  • Spring Boot – Annotations
  • Spring Boot – Auto configuration
  • Spring Boot – AOP
  • Spring Boot – Logging
  • Spring Boot – DevTools
  • Spring Boot – WAR Packaging
  • Spring Boot – REST API
  • Spring Boot – CRUD
  • Spring Boot – OAuth2
  • Spring Boot – Testing
  • Spring Boot – RestTemplate
  • Spring Boot – Thymeleaf
  • Spring Boot – Hibernate
  • Spring Boot – DataSource
  • Spring Boot – Error Handling
  • Spring Boot – Caching
  • Spring Boot – Retry
  • Spring Boot – BasicAuth
  • Spring Boot – H2 Database
  • Spring Boot – Ehcache 3.x
  • Spring Boot – Gson
  • Spring Boot – RMI
  • Spring Boot – Send Email
  • Spring Boot – Interview Questions

Spring Boot Tutorial

  • Spring Boot – CommandLineRunner
  • Spring Boot – Configure Jetty
  • Spring Boot – Tomcat Default Port
  • Spring Boot – Context Root
  • Spring Boot – SSL [https]
  • Spring Boot – Get all loaded beans
  • Spring Boot – PropertyEditor
  • Spring Boot – @EnableScheduling
  • Spring Boot – Jersey
  • Spring Boot – SOAP Webservice
  • Spring Boot – SOAP Client
  • Spring Boot – JMSTemplate
  • Spring Boot – REST APIs
  • Spring Boot – JSP View
  • Spring Boot – Actuator endpoints
  • Spring Boot – Role Based Security
  • Spring Boot – RSS / ATOM Feed
  • Spring Boot – Ehcache 2.x

Meta Links

  • About Me
  • Contact Us
  • Privacy policy
  • Advertise
  • Guest and Sponsored Posts

Recommended Reading

  • 10 Life Lessons
  • Secure Hash Algorithms
  • How Web Servers work?
  • How Java I/O Works Internally?
  • Best Way to Learn Java
  • Java Best Practices Guide
  • Microservices Tutorial
  • REST API Tutorial
  • How to Start New Blog

Copyright © 2020 · HowToDoInjava.com · All Rights Reserved. | Sitemap

  • Sealed Classes and Interfaces