Basic Understanding of RxJS

Anjali Tanpure
9 min readApr 29, 2020

Hello everyone! In this article I will be talking about RxJS as I have just completed a course from Pluralsight on RxJS in Angular: Reactive Development by the author Deborah Kurata. Thanks to her as she has explained the terms very well. This post includes some of the learning from this course as well. Let’s get started!

RxJs is a library that brings the concept of “reactive programming” to the web. With Reactive Extension of JavaScript i.e. RxJS, we leverage patterns to collect data from multiple sources. Combine data for display, cache data to improve performance and react to the user.

Reactive extensions were originally developed by Microsoft as Rx.Net. Since that time, the reactive extension library has been implemented for several other languages including Java(RxJava), Python(RxPy), Ruby(Rx.rb), JavaScript(RxJS).

What is RxJS ?

According to rxjs.dev,

RxJS is a library for reactive programming using Observables, to make it easier to compose asynchronous or callback-based code.

But what does it mean ?

Imagine RxJS as, “Manage the incoming data as it flows through time” i.e. the streams. stream is nothing but a sequence of data elements made available over time. It can be thought of as flow of data being processed one at a time rather than in chunks.

Imagine a stream as a conveyor belt let’s say in a bottle factory, each bottle being processed one at a time rather than in large batches.

source: https://stock.adobe.com
Image source: https://stock.adobe.com

The data can be of any type such as numbers, strings, or an array of employees or messages or can be a response returned from an HTTP request. The stream can come from user input such as mouse or keyboard events. We can use RxJS, “to manage a stream of any data”.

RxJS Terms

Observer / Subscriber:

Imagine an observer as an employee in a factory who is observing the bottle as they are emitted onto a conveyor belt. So as an observer, employee get notification as:

  • The next bottle comes so that he can process it in whatever way he wants.
    [ next item process it: next() ]
  • If some bottle has a defect [ error occurred, handle it: error() ]
  • When all bottles has been completed [ complete, you’re done: complete() ]

So RxJS observer observes the stream, & responds to it’s notification.
From Angular documentation observer is,

A JavaScript object that defines the handlers for the notifications you receive.

Basically in RxJS, observer is also defined as an interface with next(), error() and complete() methods. One such class that implements an observer interface is Subscriber. In RxJS, each observer converted into a subscriber.
A subscriber is an observer with additional features to unsubscribe from an observable.
Let’s see how observable looks like :

const observer = {
next: bottle => console.log(`Bottle is emitted ${bottle}`);
error: err => console.log(`Error occurred: ${err}`);
complete: () => console.log(`No more bottles. Goodbye for the day!`);
};

Observable / Observable Stream

Referring to the above images, our stream is a bottle moving along the conveyor. In RxJS, a stream of data is called an observable stream. Also called an ‘observable sequence’ or just an ‘observable’ or ‘stream’. Observables can emit finite/infinite numbers of values.

Observables can be:

  • Synchronous — items are emitted to the stream immediately
  • Asynchronous — items are emitted at some future point in time

We can create observables using the ‘new’ keyword.

const bottleStream = new Observable (bottleObserver => {
bottleObserver.next(‘bottle 1’);
bottleObserver.next(‘bottle 2’);
bottleObserver.complete();
});

Using the above example to understand the concepts. Normally we do not write code like this. There are easier ways to create observable streams.

Will the above code emit two bottle strings to our streams ? Well the answer is NO. Observables are lazy, and don’t execute when they are defined. So guys what’s missing here ? Yes, we have not started the stream. We start the stream by calling the subscribe() method of the observable.

We must subscribe to the observable to start a stream otherwise, no values are emitted. Like this:

const sub = bottleStream.subscribe(observer);

When we subscribe we pass, in the observer to monitor that subscription & react each time an item is emitted. subscribe() method returns a subscription, which represents the execution of the observable.

Let’s see how it’s executed ?

  1. As we have subscribed to the observable, code in the constructor gets executed.
  2. It first calls next() on the observer, displaying the next method message in the console.
  3. It calls next() again, displaying a second message in the console.
  4. Lastly it calls the complete() method, stopping our stream and displaying our complete message.

When working with observables we don’t normally create an observer like above, and pass it into the subscribe method. Rather we pass the next, error and complete methods, directly into the subscribe method like this:

const sub = bottleStream.subscribe({
next: bottle => console.log(`Bottle is emitted ${bottle}`);
error: err => console.log(`Error occurred: ${err}`);
complete: () => console.log(`No more bottles. Goodbye for the day!`);
});

Each of the arguments to the observer i.e. next, error, complete are optional, only specify what is needed.

Stop the stream:

Now, in a bottle factory at the end of day, what happens if users do not stop the conveyor belt ? Well bottles get spread everywhere in factories. Similarly if we do not stop the RxJS stream, (like we stopped above using complete method) it will create memory leak issues. There are multiple ways to stop the stream:

  • Call observer’s complete() method.
  • Use completing operators like : of(), from().
  • If an observer throws an error, the stream will stop automatically.
  • By unsubscribing the subscription using unsubscribe method: sub.unsubscribe();

Creating observables:

In Angular we often work with observables, which Angular creates for us. But sometimes we want to create one ourselves. We can create observables by using an observable constructor using the new keyword that we have seen above. But RxJS provides simple functions for us to create observables they are:

  • of(): Creates an observable using a set of defined values.
    const bottleStream = of(‘bottle1’, ‘bottle2’);
  • from(): creates an observable from an array/other data structure.
    const bottleStream = from([‘bottle1’, ‘bottle2’]);
  • fromEvent(): Creates an observable with any DOM event.
    const source = fromEvent(document, ‘click’);

These commonly used creation functions are static and we can call them without object. There are some more creation functions available in RxJS.

I hope you have understood the important terms of RxJS. Now moving forward to the RxJS operators.

Operators

Operators are nothing but functions. In RxJS, we pipe each emitted item through a set of operators. The operators can transform, filter, or process the item. We also use operators to compose streams. We can apply multiple operators in sequence using the observable pipe method.

Take a look at the example below! There is an observable stream of three numbers including 3, 2 and 4 which is our source observable.

of(3, 2, 4).pipe(
map(val => val * 2),
tap(val => console.log(val)),
take(2)
).subscribe(console.log);

pipe():

We have called the pipe() method of the observable and pass different RxJS operators like map, tap and take to it separated by commas. Now when we subscribe to the source observable, the source observable stream starts emitting the items. Each operator pipes through the series of operators in sequence.

Here first 3 will be emitted and processed through each operator, then 2 and then 4. The value output by the last operator is the value emitted to the result observable and processed by the observable’s next() method which we pass to the subscribe.

When we subscribe to an observable with operators, each operator takes an observable as an input, the operator subscribes to that input observable and creates, processes and returns an output observable. Now that output observable will be the input observable for the next operator in sequence. We can think theses operators as a pipeline.

Let’s move on to the operators that we have used in our pipe to solve this example.

map():

This is the most commonly used operator in RxJS. map transforms or changes each emitted item as defined by a function we provide, hence it is called as a transformation operator. Here in our example, we map/transforms each emitted item to two times it’s value. For each item in the source, exactly one mapped item is emitted.

Used for: Making changes to each item in the source stream.

tap():

The purpose of the tap operator is to perform an operation that does not affect the stream. Formally, tap is a utility operator. If we aren’t getting the results that we expect, then we can use a tap operator to debug the issue. For tap, input observable is the same as output observable.

Used for:

  1. Debugging
  2. Performing actions outside the flow of data that don’t modify the stream.

take():

The take operator emits specified number of items. It is a filtering operator. When an item is emitted, take counts the item. If the count ≤ specified number, it emits the output stream. When it equals to the specified number, it completes the stream.

Used for:

  1. Taking a specified numbers of items.
  2. Limiting unlimited streams such as streams created by an ‘interval’ function.

I hope you understand these operators, Now, let’s execute our example.
Our source observable stream contains 3 numbers: 3, 2 and 4 respectively.

  1. map: First 3 will be emitted and pass it to map function: 3 * 2 = 6.
  2. tap: The first tap logs it to the console. // 6
  3. take: will taken it as first time
  4. subscribe: logs 6 to the console. // 6
  5. map: Secondly 2 will be emitted, so 2 * 2 = 4.
  6. tap: The second tap logs it to the console. // 4
  7. take: will taken it as second time
  8. subscribe: logs 4 to the console. // 4

As we have provided ‘ 2’ as an argument to take. So it will emit at most two values and stream will be stopped and no 3rd value will be emitted i.e. 4.

So final output of our example will be:

of(3, 2, 4).pipe(
map(val => val * 2),
tap(val => console.log(val)),
take(2)
).subscribe(console.log);
Output:
6
6
4
4

There are about 100+ built-in RxJS operators. It takes time to learn what they all do and how they can be used. Now let’s see how to handle errors with observables.

Error Handling

While working with observables it’s important to catch the errors. If there is an error, any stream will be stopped and no more values can emit. There are two strategies to handle the errors with observables. In both strategies, the first step is to catch the error that brings us to the next() operator.

  1. Catch and Replace
  2. Catch and Rethrow

Catch and Replace:

It involves catching the error & replacing errored observable with a new observable. Replacement of errored observable depends on the error and observable. We could return hard-coded data or locally stored data or empty array/value as a replaced observable when error comes using catchError().

catchError():
Catches an error that occurred on observable. Here we catch the error & pass in a handleError method for processing the error.
catchError(this.handleError);

catchError is useful to rethrow the error or replacing the errored observable so that we can continue the stream even after error occurs.

const http$ = this.http.get<Bottle[]>(‘/api/bottlesDetail’);
http$
.pipe(
catchError(err => of([{id: 1, material: 'glass', color: 'red'}])
.subscribe(
res => console.log(‘HTTP response’, res),
err => console.log(‘HTTP Error’, err),
() => console.log(‘HTTP request completed.’)
);

When the error occurs, catchError operator is going to take error, unsubscribes from the input stream and return an observable which is an array of a bottle object, using of operator. The observer does not get notified of the error as catchError already handled the error. Instead observable’s next method will call with the new observable from catchError operator.

Catch and Rethrow:

In this mechanism, we rethrow the error caught by the catchError using ‘throwError’. throwError creates an observable that never emits any value. Instead, it errors out immediately using the error caught by catchError.

const http$this.http.get<Bottle[]>('/api/bottlesDetail');http$
.pipe(
map(res => res['payload']),
catchError(err => {
console.log('caught mapping error and rethrowing', err);
return throwError(err);
})
)

Here, we rethrow an error from catchError. This is how we can handle errors in RxJs.

I hope that you have enjoyed this post on RxJS terms, operators and how to handle errors. Also, if you have some questions or comments please let me know in the comments. Thank you!

References:

--

--