Rxjs Essentials - ajayupreti/Tech.Blogs GitHub Wiki

RxJS

  • RxJS is a library for reactive programming using Observables to make it easier to compose asynchronous or callback-based code.
  • RxJS is about pushing data to the subscribers over time.

The main players of RxJS

Observable

It is a data stream that pushes data over time. An observable gets data from some data source (a socket, an array, UI events) one element at a time. To be precise, an observable knows how to do three things:

  • Emit the next element to the observer
  • Throw an error on the observer
  • Inform the observer that the stream is over

Observer

The observer is a consumer of observable values. It is an object/function which knows how to handle the data. An observer object provides up to three callbacks:

  • The function to handle the next element emitted by the observable
  • The function to handle errors thrown by the observable
  • The function to handle the end of stream

Subscriber

The subscriber connects an observable and observer by invoking the method subscribe() and disconnects them by invoking unsubscribe().

Creating Observables

There are different ways of turning a data piece or data source into an Observable:

1. Observable.from([1,2,3,4,5])

converts an array or iterable into Observable. You can also use any an iterable data collection or a generator function as an argument of from()

2. Observable.create(myObserver)

returns an Observable that can invoke methods on Observer that you will create and supply as an argument

3. Observable.fromEvent(myInput, 'keyup')

converts an event into an Observable

4. Observable.fromPromise(myPromise)

converts a Promise into an Observable

5. Observable.of(1,2,3)

turns the sequence of numbers into an Observable

6. Observable.range(100)

returns a sequence of integers (0,1,2,3…) in the specified range

7. Observable.interval(100)

returns a sequence of integer (0,1,2,3…) every second

RxJS Operators

Operators are methods on the Observable type, such as .map(...), .filter(...), .merge(...), etc. When called, they do not change the existing Observable instance. Instead, they return a new Observable, whose subscription logic is based on the rest Observable.

The map() operator

The map() operator transforms one value to another. It takes a given value from the observable stream and applies the provided transforming function to it.

The filter() operator

The filter() operator takes a function predicate as an argument, which returns true if the emitted value meets the criteria, or false otherwise. Only the values that meet the criteria will make it to the observer.

Combining RxJS Observables

The concat() operator

  • combine multiple Observables sequentially into one (no interleaving occurs).
  • It's useful for a sequential processing, e.g. HTTP requests.

// Emulate first HTTP request that takes 3 sec
let threeSecHTTPRequest = Observable.timer(3000).mapTo('First response');
// Emulate second HTTP request that takes 1 sec
let oneSecHTTPRequest = Observable.timer(1000).mapTo('Second response');
// Subscription
Observable
	.concat(threeSecHTTPRequest, oneSecHTTPRequest)
	.subscribe(res=>console.log(res));

The merge() operator

  • combine multiple Observables into one (interleaving can occur)

// Emulate first HTTP request that takes 3 sec
let threeSecHTTPRequest = Observable.timer(3000).mapTo('First response');
// Emulate second HTTP request that takes 1 sec
let oneSecHTTPRequest = Observable.timer(1000).mapTo('Second response');
// Subscription
Observable
	.merge(threeSecHTTPRequest, oneSecHTTPRequest)
	.subscribe(res=>console.log(res));

The flatMap() operator

The flatMap() is used to “transform the items emitted by an observable into observables, then flatten the emissions from those into a single observable”.

In RxJS, flatMap() is an alias of mergeMap() so these two operators have the same functionality.

getDrinks()
	.flatMap(drinks=>drinks)
	.subscribe(
		(drink)=>{ console.log("Subscriber got " + drink.name + ": " + drink.price); },
		(error)=>{ console.err(error); },
		()=>{ console.log("The stream of drinks is over"); }
	);

The switchMap operator

While flatMap() unwraps and merges all the values emitted by the outer observable, the switchMap() operator handles the values from the outer observable but cancels the inner subscription being processed if the outer observable emits a new value.

let outer = Rx.Observable.interval(1000).take(2);
let combined = outer.switchMap((x)=>{
	return Rx.Observable
		.interval(400)
		.take(3)
		.map((y)=>{
			return `outer ${x}: inner ${y}`;
		});
});
combined.subscribe((result)=>{
	console.log(`${result}`);
});

// The third value from the inner observable is not emitted into the resulting stream
/*
outer 0: inner 0
outer 0: inner 1
outer 1: inner 0
outer 1: inner 1
outer 1: inner 2
*/

Subject

A RxJS Subject is an object that contains the observable and observer(s). This means that you can push the data to its observer(s) using next() as well as subscribe to it. A Subject can have multiple observers, which makes it useful when you need to implement for multi-casting – emit a value to multiple subscribers.

Multicasting with BehaviorSubject

  • Behavior Subject is a type of subject
  • Represent a value that changes over time
  • Every subscriber gets the initial or the latest value emitted by the BehaviorSubject
  • Examples: login status, state
export class AppStateService{
	public stateEvent:BehaviorSubject<string> = new BehaviorSubject('');

	constructor(){
	}
	set searchCriteria(value:string){
		this.stateEvent.next(value);
	}
}
export class AppSearchComponent{
	private searchInput:FormControl;

	constructor(private stateService:AppStateService){
		this.searchInput
			.valueChanges
			.debounceTime(300)
			.do(value=>console.log(`The user entered ${value}`))
			.subscribe(
				(searchValue)=>{
					this.stateService.searchCriteria = searchValue;
				}
			);
	}
}
export class AppEbayComponent implements OnDestroy{
	private searchFor:string;
	private subscription:Subscription;

	constructor(private stateService:AppStateService){
		this.subscription = this.stateService
					.stateEvent
					.subscribe(
						(event)=>{
							this.searchFor = event;
						}
					);
	}
	ngOnDestroy(){
		this.subscription.unsubscribe();
	}
}
⚠️ **GitHub.com Fallback** ⚠️