@tsaqib/trex
TypeScript icon, indicating that this package has built-in type declarations

0.9.31 • Public • Published

Reactive Extension in TypeScript (TRex)

@tsaqib/trex @tsaqib/trex @tsaqib/trex

The computations our online activities cause for free applications such as Facebook and Google are incredibly expensive. Even if you had billions of dollars, an optimized and profitable solution squeezing out of those dollars is still very much non-trivial. They build such immense scale applications we use every day on top of the reactive programming paradigm to help them process data only when needed. Only responding to the relevant query, results in massive cost savings. Thus, allowing internet-scale applications to serve us instantly without charging us as much $. Angular, Vue, React, etc. frameworks are also built on top of the reactive programming principle.

This package helps you do functional reactive programming, both on server-side and front-end apps. It helps you define and destroy data streams easily, which works as an event bus. You can subscribe and unsubscribe to the streams. You can perform map, filter and pass on your own operators at the observable-level and process it however you like in the observers.

This package has zero dependencies as everything was written from scratch.

Installation

npm i -S @tsaqib/trex

Examples:

See /tests directory for more examples

import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");

// Your custom data processor; used as a sample here
const workflowEngine = new WorkflowEngine();
const messageObservable = new tx.Observable();

const workflowQueue = new tx.Observer((message) => {
	const workflowObservable = new tx.Observable();
	const workflowObserver = new tx.Observer(workflowEngine.process);
	workflowObservable.emit(message);
});
const notifier = new tx.Observer((message) => {
	// TODO: notify office channel
});
const log = (message) => {
	// TODO: log all oncoming messages
};

messageObservable
	.pipe(
		tx.tap(log),
		tx.take(10), // Take first 10 messages
		tx.filter((m) => validateJSON(m)),
		tx.map((m) => JSON.parse(m)),
		tx.pluck('message')
	)
	.multicast(workflowQueue, notifier);

messageObservable.emit('{ "message": "I am unwell.", "to": "#office" }');

API documentations

@tsaqib/trex

@tsaqib/trex

Classes

@tsaqib/trexFilterOperator

Class: FilterOperator

This operator runs the data items through the predicate you pass on to it and if it satisfies the predicate, it returns back the item. As a result, the observers get the item on emit.

Basic usage example:

import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");

const observer = new tx.Observer(console.log);
observable
	.pipe(
		tx.map((num) => num * 3),
		tx.filter((num) => num > 10)
	)
	.subscribe(observer);
observable.emit(10);

Hierarchy

OperatorBase

FilterOperator

Implements

Index

Methods

Methods

emit

emit(item: any): void

Overrides OperatorBase.emit

Defined in operators/FilterOperator.ts:32

Applies the specified predicate on the item and returns it when the predicate returns true.

Parameters:

Name Type Description
item any The item

Returns: void

@tsaqib/trexMapOperator

Class: MapOperator

Executes standard 1:1 map function on an incoming item and returns the computed item back. The default behaviour of the OperatorBase is MapOperator. Therefore, this class is a placeholder.

Basic usage example:

import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");

const observer = new tx.Observer(console.log);
observable
	.pipe(
		tx.map((num) => num * 2),
		tx.map((num) => num * 3)
	)
	.subscribe(observer);
observable.emit(10);

Hierarchy

OperatorBase

MapOperator

Implements

@tsaqib/trexObservable

Class: Observable

An Observable listens to the streams of data and passes on to its observers. You use the subscribe function to subscribe and emit function to add a new data to the stream.

Basic usage example:

import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");

const observable = new tx.Observable();

implements {IObservable}

Hierarchy

Implements

Index

Constructors

Properties

Methods

Constructors

constructor

+ new Observable(): Observable

Defined in Observable.ts:26

Constructs an Observable, which is an implementation of IObservable.

Returns: Observable

Properties

observers

observers: IObserver[]

Implementation of IObservable.observers

Defined in Observable.ts:25


Optional pipeHead

pipeHead? : LinkedListIObservable

Implementation of IObservable.pipeHead

Defined in Observable.ts:26

Methods

destroy

destroy(): void

Implementation of IObservable

Defined in Observable.ts:131

Returns: void


emit

emit(items: any | any[]): void

Implementation of IObservable

Defined in Observable.ts:68

Parameters:

Name Type
items any | any[]

Returns: void


multicast

multicast(...observers: IObserver[]): void

Implementation of IObservable

Defined in Observable.ts:118

Parameters:

Name Type
...observers IObserver[]

Returns: void


pipe

pipe(...observables: IObservable[]): IObservable

Implementation of IObservable

Defined in Observable.ts:96

Parameters:

Name Type
...observables IObservable[]

Returns: IObservable


subscribe

subscribe(observer: IObserver): void

Implementation of IObservable

Defined in Observable.ts:38

Parameters:

Name Type
observer IObserver

Returns: void


unsubscribe

unsubscribe(observer: IObserver): void

Implementation of IObservable

Defined in Observable.ts:44

Parameters:

Name Type
observer IObserver

Returns: void

@tsaqib/trexObserver

Class: Observer

The Observer gives you the basis for an observer. A function or pipe can be passed onto the constructor or you can subclass the class itself to make your own observer.

implements {IObservable}

Hierarchy

  • Observer

Implements

Index

Constructors

Properties

Constructors

constructor

+ new Observer(next: function, error?: undefined | function): Observer

Defined in Observer.ts:12

Constructs an Observer.

Basic usage example:

import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");

// The Observer is the implementation of IObserver
const observer1 = new tx.Observer(
	(item: string) => console.log(item),
	(err: any) => console.error(err);
)
const observer2 = new tx.Observer(console.log);

Parameters:

next: function

The function to invoke on data arrival

▸ (item: any): void

Parameters:

Name Type
item any

Optional error: undefined | function

The error handler function

Returns: Observer

Properties

Optional error

error? : undefined | function

Implementation of IObserver.error

Defined in Observer.ts:12


next

next: function

Implementation of IObserver.next

Defined in Observer.ts:11

Type declaration:

▸ (item: any): void

Parameters:

Name Type
item any

@tsaqib/trexOperatorBase

Class: OperatorBase

This class provides you the basis for your own operators and operators that are included in this package already. Operators inherit from Observable, so they have same methods and properties.

Basic usage example:

import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");

class Squarer : OperatorBase {
	emit (item: number) {
		this.observable.emit(item * item);
	}
}

Hierarchy

Implements

Index

Constructors

Properties

Methods

Constructors

constructor

+ new OperatorBase(fn: function): OperatorBase

Overrides Observable.constructor

Defined in operators/OperatorBase.ts:30

Constructs an OperatorBase.

** Warning: You should use this only by subclassing.

Parameters:

fn: function

The function to apply to the item

▸ (item: any): any

Parameters:

Name Type
item any

Returns: OperatorBase

Properties

fn

fn: function

Defined in operators/OperatorBase.ts:28

Type declaration:

▸ (item: any): any

Parameters:

Name Type
item any

observable

observable: IObservable

Defined in operators/OperatorBase.ts:29


Optional pipeHead

pipeHead? : LinkedListIObservable

Implementation of IObservable.pipeHead

Overrides Observable.pipeHead

Defined in operators/OperatorBase.ts:30

Methods

emit

emit(item: any | any[]): void

Implementation of IObservable

Overrides Observable.emit

Defined in operators/OperatorBase.ts:50

Parameters:

Name Type
item any | any[]

Returns: void


pipe

pipe(...observables: IObservable[]): never

Implementation of IObservable

Overrides Observable.pipe

Defined in operators/OperatorBase.ts:54

Parameters:

Name Type
...observables IObservable[]

Returns: never


subscribe

subscribe(observer: IObserver): void

Implementation of IObservable

Overrides Observable.subscribe

Defined in operators/OperatorBase.ts:46

Parameters:

Name Type
observer IObserver

Returns: void

@tsaqib/trexPluckOperator

Class: PluckOperator

This operator returns the specified property of a value.

Basic usage example:

import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");

const observable = new tx.Observable();
observable
	.pipe(tx.take(1), tx.pluck('email'))
	.subscribe(new tx.Observer(console.log));
observable.emit({ name: 'King', email: 'email@kingdom' });
observable.emit({ name: 'Queen', email: 'email@queendom' });

// Output: email@kingdom

Hierarchy

OperatorBase

PluckOperator

Implements

Index

Constructors

Properties

Methods

Constructors

constructor

+ new PluckOperator(propName: string): PluckOperator

Overrides OperatorBase.constructor

Defined in operators/PluckOperator.ts:25

Constructs the PluckOperator

Parameters:

Name Type Default
propName string ""

Returns: PluckOperator

Properties

Private propName

propName: string

Defined in operators/PluckOperator.ts:30

Methods

emit

emit(item: any): void

Overrides OperatorBase.emit

Defined in operators/PluckOperator.ts:43

Emits the property of an item as specified by the propName in the PluckOperator's constructor.

Parameters:

Name Type Description
item any The item

Returns: void

@tsaqib/trexTakeOperator

Class: TakeOperator

This operator keeps the count of the items it has encountered and only allow them to pass through as long as it does not exceed a specified total count.

Basic usage example:

import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");

const observer = new tx.Observer(console.log);
observable.pipe(tx.take(3)).subscribe(observer);
observable.emit([10, 20, 30, 40, 50, 60]);

// Output:
// 10
// 20
// 30

Hierarchy

OperatorBase

TakeOperator

Implements

Index

Constructors

Properties

Methods

Constructors

constructor

+ new TakeOperator(count: number): TakeOperator

Overrides OperatorBase.constructor

Defined in operators/TakeOperator.ts:27

Constructs the TakeOperator

Parameters:

Name Type Default
count number 0

Returns: TakeOperator

Properties

Private count

count: number

Defined in operators/TakeOperator.ts:32


total

total: number = 0

Defined in operators/TakeOperator.ts:27

Methods

emit

emit(item: any): void

Overrides OperatorBase.emit

Defined in operators/TakeOperator.ts:43

Emits the item as long as the current count of items doesn't exceed the total allocated by count.

Parameters:

Name Type Description
item any The item

Returns: void

@tsaqib/trexTapOperator

Class: TapOperator

This operator runs a data item through the function you pass on to it, but returns the original value.

Basic usage example:

import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");

const square = (num) => num * num;
const observable = new tx.Observable();
const observer = new tx.Observer(console.log);
observable.pipe(tx.tap(square)).subscribe(observer);
observable.emit(10);

// Output: 10

Hierarchy

OperatorBase

TapOperator

Implements

Index

Methods

Methods

emit

emit(item: any): void

Overrides OperatorBase.emit

Defined in operators/TapOperator.ts:32

Applies the specified fucntion on the item, but returns the original value.

Parameters:

Name Type Description
item any The item

Returns: void

@tsaqib/trexTxContext

Class: TxContext

This is an internal class and not meant for public use, maintains internal states

** Warning: You should never use this class.

Hierarchy

  • TxContext

Index

Constructors

Properties

Methods

Constructors

Private constructor

+ new TxContext(): TxContext

Defined in TxContext.ts:19

Returns: TxContext

Properties

Static maps

maps: ObserverMap[] = []

Defined in TxContext.ts:22

Methods

Static addMap

addMap(observer: IObserver, observable: IObservable, chainHead?: LinkedListIObservable›): void

Defined in TxContext.ts:44

Adds a tuple of observer, observable and the head of the call's linked list.

memberof TxContext

static

Parameters:

Name Type Description
observer IObserver The observable
observable IObservable -
chainHead? LinkedListIObservable -

Returns: void


Static getMap

getMap(observer: IObserver): ObserverMap[] | undefined

Defined in TxContext.ts:60

Gets a tuple list of observer, observable and the head of the call's linked list for a given IObserver

memberof TxContext

static

Parameters:

Name Type Description
observer IObserver The observer to lookup

Returns: ObserverMap[] | undefined


Static print

print(): void

Defined in TxContext.ts:31

Returns: void


Static removeMap

removeMap(map: ObserverMap): void

Defined in TxContext.ts:71

Removes a tuple for a given ObserverMap instance

memberof TxContext

static

Parameters:

Name Type
map ObserverMap

Returns: void

@tsaqib/trex

@tsaqib/trex

Index

Classes

Interfaces

Type aliases

Functions

Type aliases

LinkedList

Ƭ LinkedList: object

Defined in Shorthands.ts:8

Type declaration:


ObserverMap

Ƭ ObserverMap: object

Defined in TxContext.ts:5

Type declaration:

Functions

Const filter

filter(fn: function): FilterOperator‹›

Defined in Shorthands.ts:65

Returns an item only when the specified predicate is true.

Basic usage example:

import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");

const observable = new tx.Observable();
const observer = new tx.Observer(
	pipe(
		filter((num) => num < 15),
		(num) => console.log(num * 4)
	)
);
observable.subscribe(observer);
observable.emit(10);
observable.emit(20);

// Output: 40

Parameters:

fn: function

The predcate to check with the item

▸ (item: any): any

Parameters:

Name Type
item any

Returns: FilterOperator‹›


Const map

map(fn: function): MapOperator‹›

Defined in Shorthands.ts:37

Executes standard 1:1 map function on an incoming item and returns the computed item back.

Basic usage example:

import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");

const observer = new tx.Observer(console.log);
const observable = new tx.Observable();
observable
	.pipe(
		tx.map((num: number) => num * 2),
		tx.map((num: number) => num * 3)
	)
	.subscribe(observer);
observable.emit(10);

// Output: 60

Parameters:

fn: function

The function to apply on the item

▸ (item: any): any

Parameters:

Name Type
item any

Returns: MapOperator‹›


Const pluck

pluck(propName: string): PluckOperator‹›

Defined in Shorthands.ts:115

Returns the specified property of a value.

Basic usage example:

import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");

const observable = new tx.Observable();
observable
	.pipe(tx.take(1), tx.pluck('email'))
	.subscribe(new tx.Observer(console.log));
observable.emit({ name: 'King', email: 'email@kingdom' });
observable.emit({ name: 'Queen', email: 'email@queendom' });

// Output: email@kingdom

Parameters:

Name Type Description
propName string The name of the property to return from the item

Returns: PluckOperator‹›


Const take

take(count: number): TakeOperator‹›

Defined in Shorthands.ts:90

Returns up to a specified number of items.

Basic usage example:

import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");

const observer = new tx.Observer(console.log);
observable.pipe(tx.take(3)).subscribe(observer);
observable.emit([10, 20, 30, 40, 50, 60]);

// Output:
// 10
// 20
// 30

Parameters:

Name Type Description
count number The total number of items will be allowed to pass through further

Returns: TakeOperator‹›


Const tap

tap(fn: function): TapOperator‹›

Defined in Shorthands.ts:139

Applies a function, but returns the original item

Basic usage example:

import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");

const square = (num) => num * num;
const observable = new tx.Observable();
const observer = new tx.Observer(console.log);
observable.pipe(tx.tap(square)).subscribe(observer);
observable.emit(10);

// Output: 10

Parameters:

fn: function

The function to apply on the item.

▸ (item: any): any

Parameters:

Name Type
item any

Returns: TapOperator‹›

Interfaces

@tsaqib/trexIObservable

Interface: IObservable

The interface behind the Observable, maintains the contract for all observables.

interface

Hierarchy

  • IObservable

Implemented by

Index

Properties

Methods

Properties

observers

observers: IObserver[]

Defined in IObservable.ts:18

Maintains a list of observers subscribed to the observable.

** Warning: Never touch this in your use.

memberof IObservable


Optional pipeHead

pipeHead? : LinkedListIObservable

Defined in IObservable.ts:90

** Warning: Do not use this. This is an internal pointer for tracking and cleaning up subscriptions.

memberof IObservable

Methods

Optional destroy

destroy(): void

Defined in IObservable.ts:163

Destroys an Observable along with all its subscribers.

Basic usage example:

import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");

// Observer is an implementation of IObservable
const observable = new tx.Observable();
observable.subscribe(new tx.Observer(console.log));
observable.emit(10);
observable.destroy();

memberof IObservable

Returns: void


emit

emit(item: any | any[]): void

Defined in IObservable.ts:82

Emits an item to the stream

Basic usage example:

import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");

// Observer is an implementation of IObservable
const observable = new tx.Observable();
const observer = new tx.Observer(console.log);
observable.subscribe(observer);
observable.emit(10);

memberof IObservable

Parameters:

Name Type Description
item any | any[] The item(s) to stream; can be an array, too

Returns: void


multicast

multicast(...observers: IObserver[]): void

Defined in IObservable.ts:143

Subscribes an array of observers in one go, typically followed by a pipe.

Basic usage example:

import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");

const observable1 = new tx.Observer(console.log);
const observable2 = new tx.Observer((x) => console.log(x * x));

// Observer is an implementation of IObservable
const observable = new tx.Observable();
observable
	.pipe(
		tx.map((x) => x * 2),
		tx.filter((x) => x > 5)
	)
	.multicast(observer1, observer2);
observable.emit(50);

memberof IObservable

Parameters:

Name Type Description
...observers IObserver[] An array of Observer to update

Returns: void


pipe

pipe(...observables: IObservable[]): IObservable

Defined in IObservable.ts:115

Pipes a series of operations per item in the stream. All operators must be inside a pipe.

Basic usage example:

import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");

// Observer is an implementation of IObservable
const observable = new tx.Observable();
observable
	.pipe(
		tx.map((x) => x * 2),
		tx.filter((x) => x > 5)
	)
	.subscribe(new tx.Observer(console.log));
observable.emit(50);

memberof IObservable

Parameters:

Name Type Description
...observables IObservable[] The observables that form a chain of actions.

Returns: IObservable


subscribe

subscribe(observer: IObserver): void

Defined in IObservable.ts:39

Subscribes an Observer instance

Basic usage example:

import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");

// Observer is an implementation of IObservable
const observable = new tx.Observable();
const observer = new tx.Observer(console.log);
observable.subscribe(observer);
observable.emit(10);

memberof IObservable

Parameters:

Name Type Description
observer IObserver The Observer instance to be subscribed for update

Returns: void


unsubscribe

unsubscribe(observer: IObserver): void

Defined in IObservable.ts:61

Unsubscribes an Observer instance

Basic usage example:

import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");

// Observer is an implementation of IObservable
const observable = new tx.Observable();
const observer = new tx.Observer(console.log);
observable.subscribe(observer);
observable.emit(10);
observable.unsubscribe(observer);

memberof IObservable

Parameters:

Name Type Description
observer IObserver The Observer instance to be subscribed for update

Returns: void

@tsaqib/trexIObserver

Interface: IObserver

The interface behind the Observer, maintains the contract for all observers.

interface

Hierarchy

  • IObserver

Implemented by

Index

Properties

Methods

Properties

next

next: function

Defined in IObserver.ts:13

Whenever a new item is available in the stream, the next function is called with that.

param The item newly arrived in the stream.

memberof IObserver

Type declaration:

▸ (item: any): void

Parameters:

Name Type
item any

Methods

Optional error

error(err: any): void

Defined in IObserver.ts:21

The error handler for the potential exception occured inside the next function.

memberof IObserver

Parameters:

Name Type Description
err any The error object.

Returns: void

Readme

Keywords

none

Package Sidebar

Install

npm i @tsaqib/trex

Weekly Downloads

0

Version

0.9.31

License

ISC

Unpacked Size

190 kB

Total Files

68

Last publish

Collaborators

  • tsaqib