Introduction to Reactive Extensions (RxJS)
Aug 15, 2019 • 12 Minute Read
Introduction
RxJs, or Reactive Extensions for Javascript, is Observables implementation for Javascript. RxJS has multiple implementations, across various languages e.g. RxJAVA, Rx.NET, etc.
Angular uses RxJS internally for some of its features. The RxJS library has methods to create and work with Observables. We can use these utility methods for converting existing code related to async operations into observables, iterating through values within some stream, etc.
Streams
Streams are nothing but a series of values over time. If, for example, there is a counter which increments by two every second, then that counter might have a stream as below:
0
There could even be a stream that corresponds to a user entering some values in a web app form. In this case, the stream could represent each keypress as shown below:
"A"
The stream could even correspond to JSON representation of our form as below;
{ "name": "A" }
Reactive Programming
With Reactive programming, we create our entire app by defining the various streams and the operations that get performed on those streams.
While that may be easy to understand on paper, how do we actually train ourselves to think and program reactively?
Let us try converting a basic imperative function to a reactive one.
adder (a, b) {
return a + b;
}
c = adder (10, 20);
c = adder (10, 40);
As shown above, we have got some state variables a, b and c. Also, there is a function called adder.
To add a and b together and update the state of variable c to be their addition, we call the function adder.
After some time, the value of variable b gets updated to 50.
Now, there are a couple of points to note here;
- We need to find out a way of knowing that variable b has got updated, which in itself is quite hard to figure out.
- We also need to know that since variable b has changed, we now have to recompute variable c.
Now the above program might be too simplistic. But when we extend that to web apps, we realize that the inputs get constantly updated over time through a user clicking somewhere, mouse events, network events, etc.
Most of our application logic is based on figuring out which methods need to be invoked for each of these changes that take place on our inputs.
When working with reactive programming, we do not really think in terms of variables but, instead, we think in terms of streams and how the various streams are connected together.
So, in our example, we convert a, b, and c into streams.
Thus, a is no longer an individual value at any point. Instead, it is a stream of values over time.
adder can be thought of as an operation that connects the output of both streams a and b to the input of c.
If some numbers are now pushed to streams a and b, the adder gets invoked automatically. It calculates the total and pushes the result to stream c.
If, for example, c is also connected to some other stream from some other operation (e.g. multiplier), in that case, even multiplier operation would get invoked automatically.
Thus, we can see that, in reactive programming, we do not invoke functions. Instead, we just define how our app is connected together and then start pushing values to different streams. The various operations will then automatically handle everything for our app.
Thus, if the value of variable b changes, all we need to do is push this new value to stream b and the adder operation would handle the tasks for us.
Observables
While streams are more of a concept and we connect streams together using operators e.g. adder we saw above
Observables represent a new primitive type that provides the blueprint for creating streams, subscribing to them, reacting to updated values, and merging different streams to create new ones.
Interval
We first have to get an instance of RxJS Observable. To do that, we say:
let observable = Rx.Observable;
Again, an observable is not a stream. But instead, it just describes various streams and how they are linked via operators.
Let's say, we want our observable to create a stream and push a new number every two seconds that is increased by 1.
With RxJS, we can make use of the operator interval, as shown below
let observable = Rx.Observable.interval(2000);
The interval takes the number of milliseconds as the first argument and that is between pushing a new number to the stream.
Also, operators return a new observable with the previous operator applied. Thus allowing us to chain operators together as seen in the example below;
let observable = Rx.Observable
.operator11();
.operator22();
.operator33();
.operator44();
.operator55();
Subscribe
The observable that we created above will not start pushing new numbers to the stream automatically. It will do so when it gets a subscriber, as shown below:
let observable = Rx.Observable
.interval(2000);
observable.subscribe(value => console.log("New value is : " + value));
Thus the observable will now start generating new numbers. We have also added a callback method so that we can react to that when any number gets pushed. Our output would now look like:
New value is: 0
New value is: 1
New value is: 2
New value is: 3
New value is: 4
New value is: 5
New value is: 6
New value is: 7
New value is: 8
New value is: 9
Promises
Promise class is part of most modern web browsers today. Thus, we do not have to add any explicit imports for using Promises.
Below is a simple example of using Promises:
/* create new promise object */
const promise = new Promise(resolve => {
setTimeout(() => {
resolve('Hey there, Welcome to the world of Promises!');
}, 1000)
});
/* Print the resolved value that gets emitted. */
promise.then(val => console.log("Resolved value is : " + val));
An async event (e.g. setTimeout() above) would cause the promise to be either resolved or rejected.
Example of Observable With Stream Of Values
We used a promise above for a single async value. We'll now see how Observables can be used to generate a stream of values that get emitted over time.
import { Observable } from "rxjs/Observable";
/* Create new Observable with the subscribe method */
const obs: Observable<string> = new Observable(observer => {
const interval = setInterval(() => {
observer.next('Hey there, Welcome to the world of Observable!');
}, 1000);
// clear out
return () => {
clearInterval(interval);
}
});
/* Adding Subscribe to receive notifications */
obs.subscribe(val => console.log(val));
With Observables, we need to add an explicit import for the Observable class, as they are not an ECMAScript standard yet.
We then create a new Observable and also add a subscribe method that gets called whenever the observer subscribes to the observable.
We can make use of the next() method on an Observer to emit values.
In the above example, we use setInterval() method to simulate async events that emit values over time. Here, after every second, we emit string “Hey there, Welcome to the world of Observable!”.
We then have a method to clear the interval that gets called whenever all the observers unsubscribe from the observable.
We also invoke the subscribe() function that gets called each time next() emits a value to all the observers.
Subject
A Subject is a special kind of observable that is both an observable and an observer. The Subject class extends Observable. Thus, it inherits the same properties and methods of an observable. if that sound a bit confusing, let's see with an example;
import { Subject } from "rxjs/Subject";
/* create a new instance of Subject */
const subject = new Subject<number>();
/* Subscribe to the above subject */
subject.subscribe(
next => console.log('value before subject 1:', next),
error => console.warn(error),
() => console.log('complete before subject 1')
);
subject.subscribe(
next => console.log('value before subject 2:', next),
error => console.warn(error),
() => console.log('complete before subject 2')
);
/* Start emitting some values */
subject.next(11);
subject.next(22);
subject.next(33);
subject.next(44);
/* Subscribe to our subject */
subject.subscribe(
next => console.log('after: ', next),
error => console.warn(error),
() => console.log('complete after')
);
/* Subscription would now start receiving notifications */
subject.next(55);
subject.complete();
Here again, we need to explicitly import the Subject class and we then create a new instance. Using TypeScript, we specify the generic type for our subject to be of type number. Thus, we can expect the emitted values by our observable to be type numbers. We then create two subscriptions and also add a callback method with three notifications, next, catch and complete, in the same order as args to our subscribe() function. We log out the notification results to the console. We also emit four different values using the next() : 11, 22, 33 and 44. We also add a 3rd subscription after the first four values are emitted. Then, we emit the fifth value using the next(): 55. We then complete() the observable stream. Thus, notifying all our observers that no more values willbe emitted by the stream.
Our output would now look like below:
value before subject 1: 11
value before subject 2: 11
value before subject 1: 22
value before subject 2: 22
value before subject 1: 33
value before subject 2: 33
value before subject 1: 44
value before subject 2: 44
value before subject 1: 55
value before subject 2: 55
after: 55
complete before subject 1
complete before subject 2
complete after
The order of subscribing to our observable is important as the third subscription receives the fourth value (44). Also, because the Subject is an observable, we can call the next() method to emit any additional values to our observers.
AsyncSubject
AsyncSubject extends the Subject and inherits all the methods and properties of the Subject. Let's take a look:
import { AsyncSubject } from "rxjs/AsyncSubject";
/* create instance of AsyncSubject */
const sub = new AsyncSubject<number>();
console.log("Welcome to the world of AsyncSubject");
/* Subscribe to our subject */
sub.subscribe(
next => console.log('subject before:', next),
error => console.warn("subject error : ", error),
() => console.log('value subject complete before')
);
/* Output/emit some values */
sub.next(11);
sub.next(22);
sub.next(33);
sub.next(44);
/* Subscribe later to our subject */
sub.subscribe(
next => console.log('subject after : ', next),
error => console.warn(error),
() => console.log('subject complete after')
);
/* complete our observable */
sub.complete();
The above example is quite similar to the earlier one on the Subject. The main difference being that AsyncSubject will only receive the last value emitted and that is only on completion.
The console output would look something like below:
Welcome to the world of AsyncSubject
subject before: 44
subject after: 44
value subject complete before
subject complete after
Also here the order of subscription to observable is not important. We can try commenting on the complete() method call. After doing that, we can see no values being printed in the console as our observers would never receive any notification.
Welcome to the world of AsyncSubject
Conclusion
Thus, we can see how Reactive programming is about visualizing our app as streams while we perform some operations on those streams. The RxJS library allows us to create and work with Observables. Angular uses a few APIs that make use of Observables. Some of the most common ones are EventEmitter and HTTP.