Build Reactive RESTFUL API using Spring Boot Webflux - vidyasekaran/current_learning GitHub Wiki

Other links : https://www.youtube.com/watch?v=M3jNn3HMeWg

https://spring.io/projects/spring-data-r2dbc

Spring Data R2DBC, part of the larger Spring Data family, makes it easy to implement R2DBC based repositories. R2DBC stands for Reactive Relational Database Connectivity, an incubator to integrate relational databases using a reactive driver. Spring Data R2DBC applies familiar Spring abstractions and repository support for R2DBC. It makes it easier to build Spring-powered applications that use relational data access technologies in a reactive application stack.

The core modules that will be dealt here are:

reactor-core reactor-test reactor-netty

code copied - D:\Udemy_Reactive_SpringBootWebFlux_dilip git : https://github.com/vidyasekaran/Teach-ReactiveSpring.git

core lib for project reactor impl of reactive streams spec java 8

Reactive streams spec ---------------------

Flux and Mono are 2 classes dealt in project reactor

Flux - represents 0 to N element. Mono - represents 0 to 1 element.

When to use mono and flux

Flux -

one way to create Flux usually data come from DB

Here we are concatting flux to all elements in just and printing in console

Flux.just("Spring", "spring boot","reactive") .map(s -> s.concat("flux")) .subscribe(System.out::println);

project reactor flux https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html

Mono If you receive only one element from rest or from a database we will use Mono

Mono.just("Spring") .map(s -> s.concat("flux")) .subscribe(System.out::println;

Section 6: Project Setup Gradle project is set here, we can use maven also

-reactive web, reactive mongodb, lombok, embedded mongodb (for writing test cases).

why reactive programming?

Expectations of the App:

  • Scale based on load, no downtime.
  • use resources efficiently (we wait for response - not efficient)
  • latency or response time should be faster

Request - Response is handled in spring mvc

  • Spring mvc for rest app
  • we see how Request - Response is handled in spring mvc
  • Thread per request model

Type 1 DB at end

Client ---> Servlet Container <-2-> Filter <-3-> DispatcherServlet <-4-> RequestMapping/HandlerMapping <-Controller-> Service(optional) <-->DB (Thread Pool)

Type 2 Remote at end

Client ---> Servlet Container <-2-> Filter <-3-> DispatcherServlet <--> RequestMapping/HandlerMapping <-Controller-> Service(optional) <-->Remote API (Thread Pool)

Type 3 combination of DB and Remote API

Client ---> Servlet Container <-2-> Filter <-3-> DispatcherServlet <--> RequestMapping/HandlerMapping <-Controller-> Service(optional) <-->Remote API & DB

Thread Per Request Model and its has many disadvantage

Client -----Request----> Application (Embedded Tomcat has "Thread Pool" which each thread handles one request)

**Adding number of threads to the thread pool in tomcat **

  • can add only x number of thread only - too many thread addition will degrade perf.

    Managed by below property server.tomcat.max-threads by default it can handle 200 connections

can be overridden in application.yml

  • adding more thread require more memory

Above disdavntage of Thread Per Request Model is overcome by horizontal scaling where multiple app instances are spawned...(kubernetes and such container help here.)

Traditional REST API Design

@GetMapping("/v1/items/{id}") public ResponseEntity getItemFromExternalService(@PathVariable Integer id){

// 1st db call

Price price = priceRepositoy.findById(id).get(); // 1. db-call 2. blocking

//2nd rest call ResponseEntity locationEntity=restTemplate.getForEntity(locationUrl, Location.class); //1. rest call synchronous 2. blocking

Item item = buildItem(price,locationEntity.getBody());

return ResponseEntity.ok(item); }

We follow imperative style APIs and imperative coding

  • Top Down Approach (request had to wait till response received due to "Blocking" state + this gets added up for each request in REST )
  • Synchoronous and Blocking (Invoking thread doesnot do anything while request is in "Blocking State" ex; fetch database records)

So imperative programming style leads to inefficient use of resources, this is the problem with rest api design.

How to fix the problems in imperative style

Move from Sychoronus (blocking) to Asynch non blocking

We have 2 options in java for asynch

  1. callback

    • complex, no return value, hard to code, leads to callback hell.
  2. Future

    • alternative to callback
    • returns future instance
    • hard to compose multiple asynch operations
  3. To overcome issue with Future java introduce advanced - CompletableFuture

    • supports funtional so easy to compose multiple asynch
    • error handling difficult
    • not great fit for asynch call with multiple items.
    • so not easy to improve upon the latency part

Traditional REST API Design

Limit on the number of concurrent users. Synchronous and Blocking Imperative Style APIs No back pressure support

Better API Design:

Asynch and Non blocking Move away from Thread per request model use fewer threads back pressure compatible.

Imperative pgm is Synchronos and blocking communication model.

Data flow as an Event Driven Stream

  • 1 event or message for every result item from data source.

-Data Source -Database -External Service -File etc

  • One Event or Message for completion or error.

Reactive Progamming

App -------invoke DB----------------> Database App <----call returned immediately--- Database

App <----onNext(item)---------------- Database App <----onNext(item)---------------- Database

App <----onComplete()---------------- Database

Summary - Data flow as an Event Driven Stream

onNext(item) -> data stream events onComplete() -> completion/sucess event onError() -> Error Event

Reactive Stream Specification has 4 interfaces

Publisher Subscriber Subscription Processor

Publisher

an interface accepts subscriber instance - subscribers register to the publisher

public interface Publisher{ public void subscribe(Subscriber<? super T> s);

Publisher Represents dataproducers and datasource ie DataBase or External Service etc.

Subscriber interface has

public interface Subscriber {

p v onSubscribe(Subscription s); p v onNext(T t); -- represents next item p v onError(Throwable t); -- thrown on error condition p v onComplete(); -- on sucessful completion

}

Subscription interface

public interface Subscription{

p v request(long n); p v cancel();

}

}

Publisher / Subscriber Event Flow - Pub / Sub Model

Publisher (Data Producer)

5 type of events

a. Subscriber invokes subscribe() method passing instance of subscriber as input on Publisher b. Publisher sends "Subscription" object denoting sucessful subscription - c. subscriber request(n) - recall methods on interface "Subscriber" - request(2) from subscriber - publisher gives 2 onNext events d. Publisher give onNext(data) to subscriber e. Publisher sends onComplete() event to subscriber on completion.

subscriber has control over how many events it needs its called backpressure.

Subscriber (Data Subscriber or consumer)

Publisher / Subscriber Event Flow - Pub / Sub Model

Publisher (Data Producer)

5 type of events

a. Subscriber invokes subscribe() method passing instance of subscriber as input on Publisher b. Publisher sends "Subscription" object denoting sucessful subscription - c. subscriber request(n) - recall methods on interface "Subscriber" - request(2) from subscriber - publisher gives 2 onNext events d. Publisher give onNext(data) to subscriber e. Publisher sends onComplete() event to subscriber on completion.

subscriber has control over how many events it needs its called backpressure.

Publisher/Subscriber Event Flow

Publisher -------------------subscribe() ---------------------------> Subscriber -------------------Subscrition()--------------------------> <-------------------Cancel()--------------------------------

subscriber send subscribe() to publisher publisher sends subscription to subscriber subscriber sends cancel() to publisher publisher stops sending events

Processor inteface

public interface Processor<T,R> extends Subscribe, Publisher{

}

What is a Reactive Library?

Impl of reactive streams spec

publisher subscriber subscription processor

Reactive libraries

RxJava Reactor Flow Class JDK 9

We are going to use Reactor which is pivotal product - recommended lib to work with spring boot

We are going to use Reactor which is pivotal product - recommended lib to work with spring boot

https://projectreactor.io/

4th gen reactive lib for building non blocking app on JVM based on the reactive streams spec

  • reactive core - fully non blocking, directly interacts with java functional api, completable future, streams and duration.

typed sequenceces offer 2 reactive composable API Flux [N] and Mono [0|1] extensively implementing reactive extensions

Non blocking IO suited for microservices arch, offers backpressure ready network engines for Http (including websockets), TCP and UDP.

Check "Reference Guide"

https://projectreactor.io/docs/core/release/reference/

Project Reactor Documentation

https://projectreactor.io/docs

Understanding Marble Diagrams for Reactive Streams

https://medium.com/@jshvarts/read-marble-diagrams-like-a-pro-3d72934d3ef5

Complete info abt Class Flux

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html

https://projectreactor.io/docs

3 modules that will be covered in this course are

Project Reactor

  • reactor-core

  • reactor-test

  • reactor-netty

  • reactor-core

Flux and Mono - reactive types of project reactor

Flux - represents 0 to N element Mono - Represents 0 to 1 element

Once error is emitted by Publisher it wont send further events to subscriber

if no exception only then onComplete event we will get.

Reactive Test

its differnet compared to impreative way -

we are going to assert on publisher

Unit test Flux

Check this out for complete test : https://github.com/reactor/reactor-core/tree/master/reactor-test/src/test/java/reactor/test

for StepVerifier

https://github.com/reactor/reactor-core/blob/master/reactor-test/src/test/java/reactor/test/StepVerifierTests.java

Example 1:

Flux stringFlux = Flux.just("Spring","Spring Boot","Reactive Spring") .concatWith(Flux.error(new RuntimeException("Exception Occured"))).log();

StepVerifier.create(stringFlux) FirstStep .expectNext("Spring","Spring Boot","Reactive Spring") Step .expectErrorMessage("Exception Occured") StepVerfier .verify();

Example 2:

@Test
public void expectNext() {
	Flux<String> flux = Flux.just("foo", "bar");

	StepVerifier.create(flux)
	            .expectNext("foo")
	            .expectNext("bar")
	            .expectComplete()
	            .verify();
}

fromIterable with optional log to see what happens behind the scene

private static List words = Arrays.asList( "the", "quick", "brown", "fox", "jumped", "over", "the", "lazy", "dog" );

  @Test
  public void simpleCreation() {
     
     Flux<String> manyWords = Flux.fromIterable(words).log();
     manyWords.subscribe(System.out::println);
  
}

Since log is added we get to see events

2020-11-25 14:47:48.361 INFO 6080 --- [main] reactor.Flux.Iterable.1 : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription) 2020-11-25 14:47:48.363 INFO 6080 --- [main] reactor.Flux.Iterable.1 : | request(unbounded) 2020-11-25 14:47:48.364 INFO 6080 --- [main] reactor.Flux.Iterable.1 : | onNext(the) the 2020-11-25 14:47:48.364 INFO 6080 --- [main] reactor.Flux.Iterable.1 : | onNext(quick) quick 2020-11-25 14:47:48.365 INFO 6080 --- [main] reactor.Flux.Iterable.1 : | onNext(brown) brown 2020-11-25 14:47:48.365 INFO 6080 --- [main] reactor.Flux.Iterable.1 : | onNext(fox) fox 2020-11-25 14:47:48.365 INFO 6080 --- [main] reactor.Flux.Iterable.1 : | onNext(jumped) jumped 2020-11-25 14:47:48.365 INFO 6080 --- [main] reactor.Flux.Iterable.1 : | onNext(over) over 2020-11-25 14:47:48.366 INFO 6080 --- [main] reactor.Flux.Iterable.1 : | onNext(the) the 2020-11-25 14:47:48.366 INFO 6080 --- [main] reactor.Flux.Iterable.1 : | onNext(lazy) lazy 2020-11-25 14:47:48.367 INFO 6080 --- [main] reactor.Flux.Iterable.1 : | onNext(dog) dog 2020-11-25 14:47:48.368 INFO 6080 --- [main] reactor.Flux.Iterable.1 : | onComplete()

fromArray

private static String[] stringWords = new String[] {"the","quick"};

  @Test
  public void simpleStringCreation() {
     
     Flux<String> manyWords = Flux.fromArray(stringWords).log();
     manyWords.subscribe(System.out::println);
  
}

FlatMap

For every element of list if you want to make a DB call and that returns a flux then use FlatMap

Flux.just("red", "white", "blue") .log() .flatMap(s ->{ //Passing each element and get list of records from DB or any other source and return a Flux return Flux.fromIterable(convertToList(s)); });

Handle Exception

Spring Support for reactive

https://spring.io/reactive

Project reactor documentation : https://projectreactor.io/docs https://projectreactor.io/docs/core/release/reference/index.html

Reactive Restful app

maven

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

controller - MediaType.APPLICATION_STREAM_JSON_VALUE will return the response as a stream in a non blocking fashion

@GetMapping(value = "/stream",produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Price> priceStream(){
    return Flux.interval(Duration.ofMillis(500))
            .map(l -> new Price(System.currentTimeMillis(),ThreadLocalRandom.current().nextInt(100, 125)))
            .log();
}

Spring WebFlux - Functional Web:

Use Functions to route the request and response. RouterFunction and HandlerFunction.

Router Function

Use to route the incoming request Similar to the functionality of @RequestMapping annotation

Handler Function

ServerRequest and ServerResponse

ServerRequest represents the HttpRequest ServerResponse represents the HttpResponse

Build a Non Blocking API using Handler and Router Functions

  1. Write Handler Function

@Component public class SampleHandlerFunction {

public Mono<ServerResponse> flux(ServerRequest serverRequest{
       
       return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body( Flux.just(1,2,3,4).log(),Integer.class);
  1. Router Function (Map a incoming request to appropriate handler)

@Configuration public class RouterFunctionConfig{

@Bean public RouterFunction route(SampleHandlerFunction handlerFunction){ return RouterFunctions.route(GET("/functional/flux").and(accept(MediaType.APPLICATION.JSON)), handlerFunction ::flux); } }

Spring WebFlux - Behind the Scenes

Request/response --> Netty (Non Blocking Server) <---2---> Reactive Streams Adapter <--3--> WebFilter <---4--> webHandler(DispatcherHandler) --5.1-->Controller EventLoop (reactor-netty) --5.2-->Functional web

Events happening between client and server

onSubscribe - goes from browser to client reqeust(unbounded) - browser asking - send me all the elements we have - each element as an event onNext(1) oncomplete - signal to browser asking to close connection.

Browser ---------/flux---------------> Spring Webflux Application (Embedded Netty) + API <-------------promise(Flux)--- --------request(unbounded)---> <----------onNext(1)---------- <----------onNext(2)---------- <----------onNext(3)---------- <----------onNext(4)---------- <-----------onComplete()----------

browser send /flux --- gets promise(flux) -

browser sends request(unbounded) -------------- browser receives all element <---------------onNext meaning give me all data that you have on that flux and it receives as streams browser gets onComplete from server meaning ; server telling client that all stream on flux is complete so close connection

Netty

Netty is an asyn event driven n/w app framework for rapid dev of maintainable high performance protocol server and clients.

netty is build on top of java Netty Asynch

Non Blocking client ------------------request----------> Netty <-----------------Future-----------

future is sent from netty saying you will surely receive streams but little later

Being Asyn - Frees us from the blocking calls

Handles large number of connections (HTTP)

Events in Netty

Client requesting for a new connection is treated as event. client requesting for data is treated as an event client posting for data is treated as an event errros are treated as event

Netty - Channel

Channel - Represents the connection between the client and server.

non blocking --------------------------------- ---inbound events--- ---outbound events--- ---------------------------------

Events going from client to server are inbound and from server to client is outbound events.

Netty - Channel + Events

inbound events: requesting for data posting data and etc

Outbound events opening or closing a connection sending response to the client.

#Netty - Event loop

loop the looks for event eventloop is registereed with single dedicated thread.

Channel lifecycle

Channel Created - channel registered with eventloop -- channel is active -- channel is inactive -- channel unregistered.

channel registered with eventloop --needed for event loop to forward the events to the right channel in the future.

channel is active - client and server are connected. Ready to send and receive events

channel is inactive - client and server are not connected

channel unregistered - channel unregistered from the eventloop

EventLoop

Is it possible to have more than one event loop? Yes

No of Event loops = 2 * x no of processors for the VM.

How to calculate the number of processors? Runtime.getRuntime().availableProcessors()

EventLoopGroup

2 Eventloop groups

accepting new connection done by one eventloop group and handle new events in the channel done by another eventloop group.

⚠️ **GitHub.com Fallback** ⚠️