Spring WebFlux Tutorial

The reactive-stack web framework, Spring WebFlux, has been added Spring 5.0. It is fully non-blocking, supports reactive streams back pressure, and runs on such servers as Netty, Undertow, and Servlet 3.1+ containers. In this spring webflux tutorial, we will learn the basic concepts behind reactive programming, webflux apis and a fully functional hello world example.

1. Reactive Programming

Reactive programming is a programming paradigm that promotes an asynchronous, non-blocking, event-driven approach to data processing. Reactive programming involves modeling data and events as observable data streams and implementing data processing routines to react to the changes in those streams.

Before digging deeper into reactive world, first understand the difference between blocking vs non-blocking request processing.

1.1. Blocking vs non-blocking (async) request processing

1.1.1. Blocking request processing

In traditional MVC applications, when a request come to server, a servlet thread is created. It delegates the request to worker threads for I/O operations such as database access etc. During the time worker threads are busy, servlet thread (request thread) remain in waiting status and thus it is blocked. It is also called synchronous request processing.

Blocking request processing
Blocking request processing

As server can have some finite number of request threads, it limits the server capability to process that number of requests at maximum server load. It may hamper the performance and limit the full utilization of server capability.

1.1.2. Non-blocking request processing

In non-blocking or asynchronous request processing, no thread is in waiting state. There is generally only one request thread receiving the request.

All incoming requests come with a event handler and call back information. Request thread delegates the incoming requests to a thread pool (generally small number of threads) which delegate the request to it’s handler function and immediately start processing other incoming requests from request thread.

When the handler function is complete, one of thread from pool collect the response and pass it to the call back function.

Non-blocking request processing
Non-blocking request processing

Non-blocking nature of threads helps in scaling the performance of the application. Small number of threads means less memory utilization and also less context switching as well.

1.2. What is reactive programming?

The term, “reactive,” refers to programming models that are built around reacting to changes. It is build around publisher-subscriber pattern (observer pattern). In reactive style of programming, we make a request for resource and start performing other things. When the data is available, we get the notification along with data inform of call back function. In callback function, we handle the response as per application/user needs.

One important thing to remember is back pressure. In non-blocking code, it becomes important to control the rate of events so that a fast producer does not overwhelm its destination.

Reactive web programming is great for applications that have streaming data, and clients that consume it and stream it to their users. It is not great for developing traditional CRUD applications. If you’re developing the next Facebook or Twitter with lots of data, a reactive API might be just what you’re looking for.

2. Reactive Streams API

The new Reactive Streams API was created by engineers from Netflix, Pivotal, Lightbend, RedHat, Twitter, and Oracle, among others and is now part of Java 9. It defines four interfaces:

  • Publisher: Emits a sequence of events to subscribers according to the demand received from its subscribers. A publisher can serve multiple subscribers.

    It has a single method:

    public interface Publisher<T> 
    {
    	public void subscribe(Subscriber<? super T> s);
    }
    
  • Subscriber: Receives and processes events emitted by a Publisher. Please note that no notifications will be received until Subscription#request(long) is called to signal the demand.

    It has four methods to handle various kind of responses received.

    public interface Subscriber<T> 
    {
    	public void onSubscribe(Subscription s);
    	public void onNext(T t);
    	public void onError(Throwable t);
    	public void onComplete();
    }
    
  • Subscription: Defines a one-to-one relationship between a Publisher and a Subscriber. It can only be used once by a single Subscriber. It is used to both signal desire for data and cancel demand (and allow resource cleanup).
    public interface Subscription<T> 
    {
    	public void request(long n);
    	public void cancel();
    }
    
  • Processor: Represents a processing stage consisting of both a Subscriber and a Publisher and obeys the contracts of both.
    public interface Processor<T, R> extends Subscriber<T>, Publisher<R> 
    {
    }
    

Two popular implementations of reactive streams are RxJava (https://github.com/ReactiveX/RxJava) and Project Reactor (https://projectreactor.io/).

3. What is Spring WebFlux ?

Spring WebFlux is parallel version of Spring MVC and supports fully non-blocking reactive streams. It support the back pressure concept and uses Netty as inbuilt server to run reactive applications. If you are familiar with Spring MVC programming style, you can easily work on webflux also.

Spring webflux uses project reactor as reactive library. Reactor is a Reactive Streams library and, therefore, all of its operators support non-blocking back pressure. It is developed in close collaboration with Spring.

Spring WebFlux heavily uses two publishers :

  • Mono: Returns 0 or 1 element.
    Mono<String> mono = Mono.just("Alex");
    Mono<String> mono = Mono.empty();
    
  • Flux: Returns 0…N elements. A Flux can be endless, meaning that it can keep emitting elements forever. Also it can return a sequence of elements and then send a completion notification when it has returned all of its elements.
    Flux<String> flux = Flux.just("A", "B", "C");
    Flux<String> flux = Flux.fromArray(new String[]{"A", "B", "C"});
    Flux<String> flux = Flux.fromIterable(Arrays.asList("A", "B", "C"));
    
    //To subscribe call method
    
    flux.subscribe();
    

In Spring WebFlux, we call reactive APIs/functions that return monos and fluxes and your controllers will return monos and fluxes. When you invoke an API that returns a mono or a flux, it will return immediately. The results of the function call will be delivered to you through the mono or flux when they become available.

To build a truly non-blocking application, we must aim to create/use all of its components as non-blocking i.e. client, controller, middle services and even the database. If one of them is blocking the requests, our aim will be defeated.

4. Spring Boot WebFlux Example

In this Spring boot 2 application, I am creating employee management system. I chosen it because, while learning, you can compare it with traditional MVC style application. To make it fully non-blocking, I am using mongodb as back-end database.

4.1. Maven dependencies

Include spring-boot-starter-webflux, spring-boot-starter-data-mongodb-reactive, spring-boot-starter-test and reactor-test dependencies.

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.1.RELEASE</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>

	<groupId>com.howtodoinjava</groupId>
	<artifactId>spring-webflux-demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>spring-webflux-demo</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-webflux</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>javax.xml.bind</groupId>
			<artifactId>jaxb-api</artifactId>
			<version>2.3.0</version>
		</dependency>
		<dependency>
			<groupId>javax.servlet</groupId>
			<artifactId>javax.servlet-api</artifactId>
			<version>3.1.0</version>
			<scope>provided</scope>
		</dependency>
	</dependencies>

</project>

4.2. Configurations

Webflux Configuration

import org.springframework.context.annotation.Configuration;

@Configuration
@EnableWebFlux
public class WebFluxConfig implements WebFluxConfigurer 
{	
}

MongoDb Configuration

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.config.AbstractReactiveMongoConfiguration;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;

import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;

@Configuration
@EnableReactiveMongoRepositories(basePackages = "com.howtodoinjava.demo.dao")
public class MongoConfig extends AbstractReactiveMongoConfiguration 
{	
	@Value("${port}")
	private String port;
	
	@Value("${dbname}")
	private String dbName;

	@Override
	public MongoClient reactiveMongoClient() {
		return MongoClients.create();
	}

	@Override
	protected String getDatabaseName() {
		return dbName;
	}

	@Bean
	public ReactiveMongoTemplate reactiveMongoTemplate() {
		return new ReactiveMongoTemplate(reactiveMongoClient(), getDatabaseName());
	}
}

Application Configuration

import org.springframework.beans.factory.config.PropertyPlaceholderConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

@Configuration
public class AppConfig 
{
	@Bean
	public static PropertyPlaceholderConfigurer getPropertyPlaceholderConfigurer() 
	{
		PropertyPlaceholderConfigurer ppc = new PropertyPlaceholderConfigurer();
		ppc.setLocation(new ClassPathResource("application.properties"));
		ppc.setIgnoreUnresolvablePlaceholders(true);
		return ppc;
	}
}

Properties file

port=27017
dbname=testdb

Logging configuration

<configuration>

	<appender name="STDOUT"
		class="ch.qos.logback.core.ConsoleAppender">
		<encoder>
			<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{5} - %msg%n
			</pattern>
		</encoder>
	</appender>

	<logger name="org.springframework" level="DEBUG"
		additivity="false">
		<appender-ref ref="STDOUT" />
	</logger>

	<root level="ERROR">
		<appender-ref ref="STDOUT" />
	</root>

</configuration>

Spring boot application

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class WebfluxFunctionalApp {

	public static void main(String[] args) {
		SpringApplication.run(WebfluxFunctionalApp.class, args);
	}
}

4.3. REST Controller

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

import com.howtodoinjava.demo.model.Employee;
import com.howtodoinjava.demo.service.EmployeeService;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
public class EmployeeController {
	@Autowired
	private EmployeeService employeeService;

	@RequestMapping(value = { "/create", "/" }, method = RequestMethod.POST)
	@ResponseStatus(HttpStatus.CREATED)
	public void create(@RequestBody Employee e) {
		employeeService.create(e);
	}

	@RequestMapping(value = "/{id}", method = RequestMethod.GET)
	public ResponseEntity<Mono<Employee>> findById(@PathVariable("id") Integer id) {
		Mono<Employee> e = employeeService.findById(id);
		HttpStatus status = e != null ? HttpStatus.OK : HttpStatus.NOT_FOUND;
		return new ResponseEntity<Mono<Employee>>(e, status);
	}

	@RequestMapping(value = "/name/{name}", method = RequestMethod.GET)
	public Flux<Employee> findByName(@PathVariable("name") String name) {
		return employeeService.findByName(name);
	}

	@RequestMapping(method = RequestMethod.GET, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
	public Flux<Employee> findAll() {
		Flux<Employee> emps = employeeService.findAll();
		return emps;
	}

	@RequestMapping(value = "/update", method = RequestMethod.PUT)
	@ResponseStatus(HttpStatus.OK)
	public Mono<Employee> update(@RequestBody Employee e) {
		return employeeService.update(e);
	}

	@RequestMapping(value = "/delete/{id}", method = RequestMethod.DELETE)
	@ResponseStatus(HttpStatus.OK)
	public void delete(@PathVariable("id") Integer id) {
		employeeService.delete(id).subscribe();
	}

}

4.4. Service classes

import com.howtodoinjava.demo.model.Employee;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public interface IEmployeeService 
{
	void create(Employee e);
	
	Mono<Employee> findById(Integer id);

	Flux<Employee> findByName(String name);

	Flux<Employee> findAll();

	Mono<Employee> update(Employee e);

	Mono<Void> delete(Integer id);
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.howtodoinjava.demo.dao.EmployeeRepository;
import com.howtodoinjava.demo.model.Employee;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
public class EmployeeService implements IEmployeeService {
	
	@Autowired
	EmployeeRepository employeeRepo;

	public void create(Employee e) {
		employeeRepo.save(e).subscribe();
	}

	public Mono<Employee> findById(Integer id) {
		return employeeRepo.findById(id);
	}

	public Flux<Employee> findByName(String name) {
		return employeeRepo.findByName(name);
	}

	public Flux<Employee> findAll() {
		return employeeRepo.findAll();
	}

	public Mono<Employee> update(Employee e) {
		return employeeRepo.save(e);
	}

	public Mono<Void> delete(Integer id) {
		return employeeRepo.deleteById(id);
	}

}

4.5. DAO repository

import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;

import com.howtodoinjava.demo.model.Employee;

import reactor.core.publisher.Flux;

public interface EmployeeRepository extends ReactiveMongoRepository<Employee, Integer> {
	@Query("{ 'name': ?0 }")
	Flux<Employee> findByName(final String name);
}

4.6. Model

import org.springframework.context.annotation.Scope;
import org.springframework.context.annotation.ScopedProxyMode;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

@Scope(scopeName = "request", proxyMode = ScopedProxyMode.TARGET_CLASS)
@Document
public class Employee {

	@Id
	int id;
	String name;
	long salary;

	//Getters and setters

	@Override
	public String toString() {
		return "Employee [id=" + id + ", name=" + name + ", salary=" + salary + "]";
	}
}

5. Demo

Start the application and check requests and responses.

  • HTTP POST http://localhost:8080/create
  • {
    	"id":1,
    	"name":"user_1",
    	"salary":101
    }
    
    {
    	"id":2,
    	"name":"user_2",
    	"salary":102
    }
    
  • HTTP PUT http://localhost:8080/update
  • {
    	"id":2,
    	"name":"user_2",
    	"salary":103
    }
    
  • HTTP GET http://localhost:8080/
  • data:{"id":1,"name":"user_1","salary":101}
    
    data:{"id":2,"name":"user_2","salary":102}
    
    Spring WebFlux Demo
    Spring WebFlux Demo

    Notice that I am testing the API with Postman chrome browser extension which is a blocking client. It will display the result only when It has collected the response for both employees.

    To verify the non-blocking response feature, hit the URL in the chrome browser directly. The results will appear one by one, as and when they are available in form of events (text/event-stream). To better view the result, consider adding a delay to controller API.

    Spring WebFlux Demo - Event Stream
    Spring WebFlux Demo – Event Stream

    6. Spring WebFlux Tutorial – Conclusion

    Both Spring MVC and Spring WebFlux support client-server architecture but there is a key difference in the concurrency model and the default behavior for blocking nature and threads. In Spring MVC, it is assumed that applications can block the current thread while in webflux, threads are non-blocking by default. It is the main difference between spring webflux vs mvc.

    Reactive and non-blocking generally do not make applications run faster. The expected benefit of reactive and non-blocking is the ability to scale the application with a small, fixed number of threads and lesser memory requirements. It makes applications more resilient under load because they scale in a more predictable manner.

    Drop me your questions related to this spring boot webflux tutorial.

    Happy Learning !!

    Was this post helpful?

    Join 7000+ Fellow Programmers

    Subscribe to get new post notifications, industry updates, best practices, and much more. Directly into your inbox, for free.

    29 thoughts on “Spring WebFlux Tutorial”

    1. Hi, I have a question:

      assume two clients call findByName method independently at the same time, doesn’t create tow thread for those client in this sample?!

      Reply
    2. I have question
      1.how can we do reactive style if record is exist then update or insert?
      2. If above possible for large data.

      Reply
    3. I do have a question:
      while we can use Tomcat 8+ with NIO mode by default, why the benifit to use Webflux, since the majar benifit is brought by NIO

      Reply
    4. Hi , Nice explanation about concept , But while download and running the code and while calling create It is created null values with null id.
      id:0
      salary:0
      _class:”com.howtodoinjava.demo.model.Employee”

      Please let me know in case any changes required in the code.
      Thanks,

      Reply
    5. Nice Article.

      If you want to see visually how FLUX works – then add this method & create bulk of records first, by calling the endpoint http://localhost:8080/callme

      @RequestMapping(value = { "/callme" }, method = RequestMethod.GET)
          @ResponseStatus(HttpStatus.CREATED)
          @ResponseBody
          public void callMe() {
              for (int i=1;i&lt;1000000; i++) {
                  Employee emp = new Employee();
                  emp.setId(i);
                  emp.setName(&quot;user_&quot; + i);
                  emp.setSalary(100 + 1);
                  employeeService.create(emp);
              }
          }

      Post successful execution of above endpoint, call http://localhost:8080 and see how information renders on browser.
      Also, the processing stops the moment you close the browser window or click 'X' (i.e. stop loading) on browser address bar. You will see below message on server console –
      19:27:13.464 [reactor-http-nio-3] DEBUG o.s.w.s.DisconnectedClient – [4003e220] Client went away: java.io.IOException: Broken pipe (stacktrace at TRACE level for 'org.springframework.web.server.DisconnectedClient')

      Reply
    6. Hi, thanks for the great article.

      Can you please elaborate on this?

      “In non-blocking code, it becomes important to control the rate of events so that a fast producer does not overwhelm its destination.”

      How does the producer really overwhelm the destination? Do you mean with the rate of incoming requests and the fact that there are only a few request handlers available?

      Reply
    7. Do I need to install mongoDB for running this application? because for me I post request is success but get is failing with ” com.mongodb.MongoTimeoutException: Timed out after 30000 ms”

      Reply
      • Yes… As per my understanding we need to have a local mongo db server running in localhost’s default port. To overcome this we can use a embedded mongo db (We just need to add below dependency to our POM file). This is very similar to embedded H2 DB.

        de.flapdoodle.embed
        de.flapdoodle.embed.mongo
        test

        Reply
    8. Hi
      why findById() returns ResponseEntity<Mono<Employee>> while findByName() returns Flux<Employee>
      The client expects the same employee.

      Reply
    9. The results will appear one by one in the browser by adding a delay in this way :

      @RequestMapping(method = RequestMethod.GET, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
      @ResponseBody
      public Flux findAll() throws InterruptedException {
      return employeeService.findAll().delayElements(Duration.ofMillis(2500));;
      }

      Reply
    10. hi
      thanks for your article.
      I have a question. I use mysql for database in my project. I think there is not reactive api for mysql.
      ِDespite Can I use spring reactive? or spring reacvtive effective for my project?
      another question is how do cancel request in spring reactive?
      thanks alot.

      Reply
    11. Hi Lokesh,

      Do you have any idea how to return html views from the controller similar to in MVC without using any templating engine.

      Reply
    12. The demo source code didn’t compile out of the box. Specifically, I had to edit this file

      src/main/java/com/howtodoinjava/demo/config/WebFluxConfig.java

      and add these two import statements

      import org.springframework.web.reactive.config.WebFluxConfigurer;
      import org.springframework.web.reactive.config.EnableWebFlux;
      
      Reply
    13. data:{}

      this is the result am getting for get, what am i doing wrong, I followed as is, the inserts, updates worked fine.

      Thanks for good tutorial Lokesh, I am a huge fan of you.

      Chaitanya.

      Reply

    Leave a Comment

    HowToDoInJava

    A blog about Java and its related technologies, the best practices, algorithms, interview questions, scripting languages, and Python.