RxJS Essentials - rohit120582sharma/Documentation GitHub Wiki

Table of contents

  • Introduction
  • Observable
    • It emits events/data-packages depending on the data-source. It is used to handle the asynchronous tasks. It follows the observable patterns.
  • Observer
    • It is a subscribe function which handles the emitted events/data-packages. Observer has three ways to handler it:
      • Handle Data
      • Handle Error
      • Handle Completion
  • Subscribe & Unsubscribe
  • Subject
    • A subject is observable and observer at the same time. So you can subscribe() it and also emit data by calling next() method. It can be used to make cross-components communication in place of EventEmitter.
  • Operators
    • Operators work only on observable. It allows to transform the data you receive from one observable to something else and return a new observable with transformed data.

Introduction

Reactive programming combines methods and concepts of functional programming techniques to give new approach to asynchronous programming.

RxJs stands for Reactive Extensions for Javascript.

RxJS is a library for reactive programming using Observables to make it easier to compose asynchronous or callback-based code.

RxJS is about pushing data to the subscribers over time.

Reference


Setup

You can install Rxjs Library from npm

npm install rxjs

To import the entire core set of functionality:

import Rx from 'rxjs/Rx';
Rx.Observable.of(1,2,3);

To import only what you need by patching

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/map';

Observable.of(1,2,3).map(x => x + '!!!');

Basic Terms

In the reactive style of coding (as opposed to imperative one), the changes in data drive the invocation of your code.

We want to observe data, which means that there is some data producer that can be a server sending data using HTTP or WebSockets, a UI input field where the user enters some data, an accelerometer in a smart phone, et al. An observable is a function (or an object) on the client that gets the producer’s data and pushes them to the subscriber(s). An observer is an object (or a function) that knows how to handle the data elements pushed by the observable.

The main players of RxJS

Observable

It is a data stream that pushes data over time. An observable gets data from some data source (a socket, an array, UI events) one element at a time. To be precise, an observable knows how to do three things:

  • Emit the next element to the observer
  • Throw an error on the observer
  • Inform the observer that the stream is over

Observer

The observer is a consumer of observable values. It is an object/function which knows how to handle the data. An observer object provides up to three callbacks:

  • The function to handle the next element emitted by the observable
  • The function to handle errors thrown by the observable
  • The function to handle the end of stream

Subscriber

The subscriber connects an observable and observer by invoking the method subscribe() and disconnects them by invoking unsubscribe().

Operator

It is a function which can be applied to trim values or en-route value transformation

Subject

Includes both an Observable and Observer(s)


Creating Observables

There are different ways of turning a data piece or data source into an Observable:

  • Observable.from([1,2,3,4,5])

    • converts an array or iterable into Observable. You can also use any an iterable data collection or a generator function as an argument of from()
  • Observable.create(myObserver)

    • returns an Observable that can invoke methods on Observer that you will create and supply as an argument
  • Observable.fromEvent(myInput, 'keyup')

    • converts an event into an Observable
  • Observable.fromPromise(myPromise)

    • converts a Promise into an Observable
  • Observable.of(1,2,3)

    • turns the sequence of numbers into an Observable
  • Observable.range(100)

    • returns a sequence of integers (0,1,2,3…) in the specified range
  • Observable.interval(100)

    • returns a sequence of integer (0,1,2,3…) every second
Rx.Observable.of(1,2,3) 
.subscribe(
    value => console.log(value),		// will be invoked for each element emitted by the observable
    err => console.error(err),			// will be invoked in case of an error 
    () => console.log("Streaming is over")	// will be invoked when the observable stream is over
);

Using Observable.create()

An observer is an object that implements one or more of these functions: next(), error(), and complete().

You can create an observable with the method create() passing an argument that will represent an observer. When observable gets created, it doesn’t know yet which concrete object will be provided.

At the time of subscription, we’ll provide a concrete observer to our observable.

function getObservableBeer(){
	return Rx.Observable.create(observer => {
			const beers = [
				{name: "Stella", country: "Belgium", price: 9.50},
				{name: "Sam Adams", country: "USA", price: 8.50},
				{name: "Bud Light", country: "USA", price: 6.50},
				{name: "Brooklyn Lager", country: "USA", price: 8.00},
				{name: "Sapporo", country: "Japan", price: 7.50}
			];
			beers.forEach( beer => observer.next(beer));
			observer.complete();
		}
	);
}

getObservableBeer()
	.subscribe(
		beer => console.log("Subscriber got " + beer.name),
		error => console.err(error),
		() => console.log("The stream is over")
	);

Intro to RxJS Operators and map, filter, and reduce operators

As the data elements flow from the observable to the observer, you can apply one or more operators, transforming each element prior to supplying it to the observer. Each operator takes an observable as an input, performs its action, and returns a new observable as an output:

operator

Since each operator takes an observable in and creates an observable as its output, operators can be chained so each observable element can go through several transformations prior to being handed to the observer.

The map() operator

The map() operator transforms one value to another. It takes a given value from the observable stream and applies the provided transforming function to it.

The filter() operator

The filter() operator takes a function predicate as an argument, which returns true if the emitted value meets the criteria, or false otherwise. Only the values that meet the criteria will make it to the observer.

The reduce() operator

The operator reduce() that allows you aggregate values emitted by an observable.


Combining RxJS Observables

The concat() operator

Subscribe to the next observable only when the previous completes. It's useful for a sequential processing, e.g. HTTP requests.

// Emulate first HTTP request that takes 3 sec
let threeSecHTTPRequest = Observable.timer(3000).mapTo('First response');
// Emulate second HTTP request that takes 1 sec
let oneSecHTTPRequest = Observable.timer(1000).mapTo('Second response');
// Subscription
Observable
	.concat(threeSecHTTPRequest, oneSecHTTPRequest)
	.subscribe(res=>console.log(res));

The merge() operator

Combining observables of the same type. It emits data of multiple observables interleave

// Emulate first HTTP request that takes 3 sec
let threeSecHTTPRequest = Observable.timer(3000).mapTo('First response');
// Emulate second HTTP request that takes 1 sec
let oneSecHTTPRequest = Observable.timer(1000).mapTo('Second response');
// Subscription
Observable
	.merge(threeSecHTTPRequest, oneSecHTTPRequest)
	.subscribe(res=>console.log(res));

The flatMap operator

In some cases, you need to treat each item emitted by an observable as another observable. In other words, the outer observable emits the inner observables. Does it mean that we need to write nested subscribe() calls (one for the outer observable and another for the inner one)? No, we don’t. The flatMap() operator takes each item from the outer observable and auto-subscribes to it.

The flatMap() is used to “transform the items emitted by an observable into observables, then flatten the emissions from those into a single observable”.

In RxJS, flatMap() is an alias of mergeMap() so these two operators have the same functionality.

The flatMap() operator takes an emitted item from the outer observable (the circle) and unwraps its content (the inner observable of diamonds) into the flattened output observable stream. The flatMap() operator merges the emissions of the inner observables so their items may interleave.

function getDrinks(){
	let beers = Rx.Observable.from([
			{name: "Stella", country: "Belgium", price: 9.50},
			{name: "Sam Adams", country: "USA", price: 8.50},
			{name: "Bud Light", country: "USA", price: 6.50}
		], Rx.Scheduler.async);
	let softDrinks = Rx.Observable.from([
			{name: "Coca Cola", country: "USA", price: 1.50},
			{name: "Fanta", country: "USA", price: 1.50},
			{name: "Lemonade", country: "France", price: 2.50}
		], Rx.Scheduler.async);

	return Rx.Observable.create(
		(observer)=>{
			observer.next(beers);
			observer.next(softDrinks);
			observer.complete();
		}
	);
}

// We want to "unload" each palette and print each drink info
getDrinks()
	.flatMap(drinks=>drinks)
	.subscribe(
		(drink)=>{ console.log("Subscriber got " + drink.name + ": " + drink.price); },
		(error)=>{ console.err(error); },
		()=>{ console.log("The stream of drinks is over"); }
	);
let outer = Rx.Observable.interval(1000).take(2);
let combined = outer.flatMap((x)=>{
	return Rx.Observable
		.interval(400)
		.take(3)
		.map((y)=>{
			return `outer ${x}: inner ${y}`;
		});
});
combined.subscribe((result)=>{
	console.log(`${result}`);
});
/*
outer 0: inner 0
outer 0: inner 1
outer 0: inner 2
outer 1: inner 0
outer 1: inner 1
outer 1: inner 2
*/

Another use of flatMap() operator is when you need to execute more than one HTTP request, where the result of the first request should be given to the second one. Since a flatMap() is a special case of map(), you can specify a transforming function while flattening the observables into a common stream.

this.httpClient
	.get('/customers/123')
	.flatMap((customer)=>{
		return this.httpClient.get(customer.orderURL);
	})
	.subscribe((response)=>{
		this.order = response;
	});

The switchMap operator

While flatMap() unwraps and merges all the values emitted by the outer observable, the switchMap() operator handles the values from the outer observable but cancels the inner subscription being processed if the outer observable emits a new value.

let outer = Rx.Observable.interval(1000).take(2);
let combined = outer.switchMap((x)=>{
	return Rx.Observable
		.interval(400)
		.take(3)
		.map((y)=>{
			return `outer ${x}: inner ${y}`;
		});
});
combined.subscribe((result)=>{
	console.log(`${result}`);
});

// The third value from the inner observable is not emitted into the resulting stream
/*
outer 0: inner 0
outer 0: inner 1
outer 1: inner 0
outer 1: inner 1
outer 1: inner 2
*/

Another use of switchMap() operator is when you are handling a search operation and need to execute a HTTP request on every keyup. If the user entered the third character but the HTTP request for the second one is still pending, it’ll get cancelled.

<input type="text" placeholder="Enter City" [formControl]="searchInput">

this.searchInput.valueChanges
	.debouceTime(200)
	.switchMap((city)=>{
		return this.getWeather(city);
	})
	.subscribe(
		(res)=>{
			this.temprature = res.temprature;
		},
		(err)=>{
			console.log(`Can't get weather.`);
		}
	);

getWeather(city){
	return this.http
		.get(this.baseWatherURL + city + this.urlSuffix)
		.map((res)=>{
			return res.json();
		});
}

Error Handling in RxJS

The Reactive Manifesto declares that a reactive app should implement the procedure to keep it alive in case of a failure.

A subscription to an observable end if one of the following occurs:

  • The consumer explicitly unsubscribes
  • The observable invokes the complete() method on the observer
  • The observable invokes the error() method on the observer

RxJS offers several operators to intercept and handle the error before it reaches the code in the error() method on the observer.

  • catch(error) - intercepts the error and you can implement some business logic to handle it
  • retry(n) - retries the erroneous operation up to n times
  • retryWhen(fn) - retries the erroneous operation as per the provided function
// Declaring three data source
function getData():Observable{...}			// Data source 1
function getCachedData():Observable{...}		// Data source 2
function getDataFromAnotherService():Observable{...}	// Data source 3

// Invoking and subscribing
getData()
	.catch((err)=>{
		if(err.status === 500){
			return getCachedData();
		}else{
			return getDataFromAnotherService();
		}
	})
	.map((beer)=>{
		return `${beer.name}, ${beer.country}`;
	})
	.subscribe((beer)=>{
		console.log(`Subscriber got ${beer}`);
	});

Hot and Cold Observables

Introduction

There are two types of observables: hot and cold.

  • Cold
    • A cold observable creates a new data producer for each subscriber.
    • A cold observable starts producing data when some code invokes a subscribe() function on it. For example, your app may declare an observable providing a URL on the server to get certain products.
  • Hot
    • A hot observable creates a data producer first, and each subscriber gets the data from one producer starting from the moment of subscription.
    • A hot observable produces data even if there are no subscribers interested in the data. For example, an accelerometer of your smartphone produces multiple data about the position of your device even if there no app that subscribes to this data.

Share operator

The share operator allows sharing the same observable between multiple subscribers. It converts cold observable into hot observable

let numbers = Observable
		.interval(1000)
		.take(3)
		.share(); // Make it hot

function subscribeToNumbers(name){
	numbers.subscribe(
		(x)=>{ console.log(`${name} got ${x}`); }
	);
}
subscribeToNumbers('Subscriber 1');

setTimeout(()=>{
	subscribeToNumbers('Subscriber 2'); // Second subscriber joins in 2.5 sec
}, 2500);

/* ---------------- Console ----------------
Subscriber 1 got 0
Subscriber 1 got 1
Subscriber 1 got 2
Subscriber 2 got 2
Subscriber 1 got 3
Subscriber 2 got 3
*/
data1:string;
data2:string;

let keyup:Observable<any> = Observable.fromEvent(this.myInputField.nativeElement, 'keyup');
let keyupValue = keyup
			.map(event => event.target.value )
			.share(); // Make it hot

keyupValue
	.subscribe(value => this.data=value);

keyupValue
	.sample(Observable.interval(3000))
	.subscribe(value => this.data2=value);

Multicasting with RxJS Subject

A RxJS Subject is an object that contains the observable and observer(s). This means that you can push the data to its observer(s) using next() as well as subscribe to it. A Subject can have multiple observers, which makes it useful when you need to implement for multi-casting – emit a value to multiple subscribers.

let orders = new Subject();

let stockExchange = orders.subscribe(
	(order)=>{
		console.log(`Sending to stock exchange`);
	}
);
let tradeCommission = orders.subscribe(
	(order)=>{
		console.log(`Reporting to trade commission`);
	}
);

orders.next(123);
tradeCommission.unsubscribe();
orders.next(456);

/* ---------------- Console ----------------
Sending to stock exchange
Reporting to trade commission
Sending to stock exchange
*/

Multicasting with RxJS BehaviorSubject

  • Behavior Subject is a type of subject
  • Represent a value that changes over time
  • Every subscriber gets the initial or the latest value emitted by the BehaviorSubject
  • Examples: login status, state

/* app.state.service.ts */

export class AppStateService{
	public stateEvent:BehaviorSubject<string> = new BehaviorSubject('');

	constructor(){
	}
	set searchCriteria(value:string){
		this.stateEvent.next(value);
	}
}
/* app.search.component.ts */

export class AppSearchComponent{
	private searchInput:FormControl;

	constructor(private stateService:AppStateService){
		this.searchInput
			.valueChanges
			.debounceTime(300)
			.do(value=>console.log(`The user entered ${value}`))
			.subscribe(
				(searchValue)=>{
					this.stateService.searchCriteria = searchValue;
				}
			);
	}
}
/* app.ebay.component.ts */

export class AppEbayComponent implements OnDestroy{
	private searchFor:string;
	private subscription:Subscription;

	constructor(private stateService:AppStateService){
		this.subscription = this.stateService
					.stateEvent
					.subscribe(
						(event)=>{
							this.searchFor = event;
						}
					);
	}
	ngOnDestroy(){
		this.subscription.unsubscribe();
	}
}
⚠️ **GitHub.com Fallback** ⚠️