rx package - greydragon888/real-router GitHub Wiki

@real-router/rx

Reactive Observable API for Real-Router. Zero-cost opt-in for reactive programming patterns.

Overview

@real-router/rx provides reactive stream APIs for Real-Router without adding bundle cost to users who don't need them. The package offers:

  • State streams — Reactive router state changes
  • Event streams — Typed router event streams
  • TC39 Observable — Standards-compliant Observable wrapper
  • Operators — Composable stream transformations
  • Async iterationfor await...of support
  • AbortSignal — Cancellation support

API Reference

state$(router, options?)

Creates a reactive stream of router state changes.

Signature:

function state$(
  router: Router,
  options?: { replay?: boolean },
): RxObservable<SubscribeState>;

Parameters:

Parameter Type Description
router Router Router instance
options.replay boolean Replay current state to new subscriber (default: true)

Returns: RxObservable<SubscribeState>

Type:

interface SubscribeState {
  route: State;
  previousRoute?: State;
}

Example:

import { state$ } from "@real-router/rx";

state$(router).subscribe(({ route, previousRoute }) => {
  console.log("Current route:", route.name);
  console.log("Previous route:", previousRoute?.name);
});

With replay disabled:

// Only receive future navigations, not current state
state$(router, { replay: false }).subscribe((state) => {
  console.log("Route:", state.route.name);
});

With AbortSignal (on subscribe):

const controller = new AbortController();

state$(router).subscribe((state) => console.log("Route:", state.route.name), {
  signal: controller.signal,
});

// Later: unsubscribe
controller.abort();

events$(router)

Creates a reactive stream of all router events.

Signature:

function events$(router: Router): RxObservable<RouterEvent>;

Parameters:

Parameter Type Description
router Router Router instance

Returns: RxObservable<RouterEvent>

Type:

type RouterEvent =
  | { type: "TRANSITION_START"; toState: State; fromState?: State }
  | {
      type: "TRANSITION_SUCCESS";
      toState: State;
      fromState?: State;
      options: NavigationOptions;
    }
  | {
      type: "TRANSITION_ERROR";
      toState?: State;
      fromState?: State;
      error: RouterError;
    }
  | { type: "TRANSITION_CANCEL"; toState: State; fromState?: State }
  | { type: "ROUTER_START" }
  | { type: "ROUTER_STOP" };

Example:

import { events$, filter } from "@real-router/rx";

events$(router)
  .pipe(filter((e) => e.type === "TRANSITION_ERROR"))
  .subscribe((event) => {
    console.error("Navigation error:", event.error);
  });

observable(router)

Creates a TC39 Observable-compliant wrapper for RxJS interop.

Signature:

function observable(router: Router): RxObservable<SubscribeState>;

Parameters:

Parameter Type Description
router Router Router instance

Returns: RxObservable<SubscribeState>

Example:

import { from } from "rxjs";
import { observable } from "@real-router/rx";

from(observable(router)).subscribe(({ route }) => {
  console.log("Route:", route.name);
});

Operators

map<T, R>(project: (value: T) => R)

Transforms emitted values using a projection function.

Signature:

function map<T, R>(project: (value: T) => R): Operator<T, R>;

Example:

import { state$, map } from "@real-router/rx";

state$(router)
  .pipe(map(({ route }) => route.name))
  .subscribe((name) => console.log("Route name:", name));

filter<T>(predicate: (value: T) => boolean)

Filters values based on a predicate function.

Signature:

function filter<T>(predicate: (value: T) => boolean): Operator<T, T>;

Example:

import { state$, filter } from "@real-router/rx";

state$(router)
  .pipe(filter(({ route }) => route.name !== "home"))
  .subscribe((state) => console.log("Non-home route:", state.route.name));

debounceTime<T>(duration: number)

Delays emissions by the specified duration (milliseconds), emitting only the last value.

Signature:

function debounceTime<T>(duration: number): Operator<T, T>;

Example:

import { state$, debounceTime } from "@real-router/rx";

state$(router)
  .pipe(debounceTime(300))
  .subscribe((state) => console.log("Debounced:", state.route.name));

distinctUntilChanged<T>(comparator?: (prev: T, curr: T) => boolean)

Filters consecutive duplicate values. Optional custom comparator.

Signature:

function distinctUntilChanged<T>(
  comparator?: (prev: T, curr: T) => boolean,
): Operator<T, T>;

Example:

import { state$, map, distinctUntilChanged } from "@real-router/rx";

state$(router)
  .pipe(
    map(({ route }) => route.params.userId),
    distinctUntilChanged(),
  )
  .subscribe((userId) => console.log("User ID changed:", userId));

takeUntil<T>(notifier: RxObservable<any>)

Completes the stream when the notifier emits.

Signature:

function takeUntil<T>(notifier: RxObservable<any>): Operator<T, T>;

Example:

import { state$, takeUntil, RxObservable } from "@real-router/rx";

const stop$ = new RxObservable((observer) => {
  setTimeout(() => observer.next(), 5000);
});

state$(router)
  .pipe(takeUntil(stop$))
  .subscribe((state) => console.log("Route:", state.route.name));
// Automatically unsubscribes after 5 seconds

Pipe Composition

The pipe() method composes multiple operators into a transformation pipeline.

Example:

import { state$, map, filter, distinctUntilChanged } from "@real-router/rx";

state$(router)
  .pipe(
    map(({ route }) => route.params.userId),
    filter((id) => id !== undefined),
    distinctUntilChanged(),
  )
  .subscribe((userId) => fetchUser(userId));

Async Iteration

Observables support for await...of via Symbol.asyncIterator.

Example:

import { state$ } from "@real-router/rx";

for await (const { route } of state$(router)) {
  console.log("Current route:", route.name);
  if (route.name === "logout") break;
}

Note: Uses latest-value semantics — rapid emissions may skip intermediate values.


Usage Examples

Event Filtering

import { events$, filter } from "@real-router/rx";

// Track navigation errors
events$(router)
  .pipe(filter((e) => e.type === "TRANSITION_ERROR"))
  .subscribe(({ error }) => {
    errorTracker.capture(error);
  });

// Track successful navigations
events$(router)
  .pipe(filter((e) => e.type === "TRANSITION_SUCCESS"))
  .subscribe(({ toState, fromState }) => {
    analytics.track("navigation", {
      from: fromState?.name,
      to: toState.name,
    });
  });

RxJS Interop

import { from } from "rxjs";
import { debounceTime, distinctUntilChanged } from "rxjs/operators";
import { observable } from "@real-router/rx";

from(observable(router))
  .pipe(
    debounceTime(100),
    distinctUntilChanged((a, b) => a.route.name === b.route.name),
  )
  .subscribe(({ route }) => {
    console.log("Route:", route.name);
  });

Migration Guide

From router[Symbol.observable]()

Before:

import { createRouter } from "@real-router/core";

const router = createRouter(routes);

router[Symbol.observable]().subscribe((observer) => {
  console.log("State:", observer);
});

After:

import { createRouter } from "@real-router/core";
import { observable } from "@real-router/rx";

const router = createRouter(routes);

observable(router).subscribe((observer) => {
  console.log("State:", observer);
});

From RxJS from(router)

Before:

import { from } from "rxjs";

from(router).subscribe(({ route }) => {
  console.log("Route:", route.name);
});

After:

import { from } from "rxjs";
import { observable } from "@real-router/rx";

from(observable(router)).subscribe(({ route }) => {
  console.log("Route:", route.name);
});

Using state$() Instead

For most use cases, state$() is simpler than observable():

import { state$ } from "@real-router/rx";

state$(router).subscribe(({ route }) => {
  console.log("Route:", route.name);
});

See Also

⚠️ **GitHub.com Fallback** ⚠️