If you ever had a chance to work with Angular, you simply couldn’t avoid RxJS. This is one of the most popular libraries in web development today, which will conveniently organize work with events and asynchronous code.
RxJS is everywhere in Angular, even though you prefer Promises to Observables. As an example, Angular uses Observables for event handling. As for the junior web developers or people who are not familiar with tech specification the whole concept will be difficult to understand. Hence, we are going to explain it to you, the easy way - using an analogy with the railroad.
So, imagine the app data flow as a railway with all surrounding infrastructure, such as stations and depots, railway personnel and everything that you see from the window of your train.
First of all, let’s consider the simplest situation when we need to track the state of a button and show a popup about the successful pressing of this button. Drawing the analogy with the railroad will give us a clear understanding of what will happen with data and how it will transform into “subscribe” later.
Thus, after clicking on the button, the data arrives at the depot and are loaded into the railway carriage. Aftermath, the data are sent by rail to the station called “subscribe”. In its simplest form, our railway carriage moves straightforward from the depot to the station “subscribe” to be unloaded there. After the unloading process, the data about the button click can be used by the application to display a popup message or window.
In your code, it might look like this:
const button = document.getElementById('clickElement');
fromEvent(button, 'click')
.subscribe((event) => {
showPopup();
});
However, on very rare occasions, our railway carriage will move in a straight line (from depot to the station). The thing is, quite often, we need to perform certain data manipulations along the way of the railway carriage. We will consider such cases further in the article.
Data Transformation
One of the examples of the data manipulations that we can perform is simply changing the data. So, let’s see what transformation will happen.
So, our railway carriage with the click data left the depot. But along the way, we decide to supplement its load with the “id” field, simultaneously expanding data. In this case, “map operator” will help us to transform data by means of adding and removing certain elements.
Your code may look as follow:
const button = document.getElementById('clickElement');
fromEvent(button, 'click')
.pipe(
map(event => ({
event,
id: Math.floor(Math.random() * 1000000),
})),
)
.subscribe((eventWithId) => {
console.log(eventWithId);
});
Merging the value of multiple streams
Talking about RxJS, keep in mind the notion of streams. We need to understand that each railway track along which our railway carriages move calls a stream. So, every developer must know how to work with these streams. The simplest thing you can do is to combine data from two streams into one.
Why do you need to combine streams? It’s all about the resulting data. For example, while working on a rather complex application, the developer usually gets data coming from more than one data source that needs to be combined before sending it to the server.
Coming back to our railway road, let's imagine you have two depots. There are railway junction “combineLatest” and a “subscribe” station. Naturally, the railway carriages will leave different depots (since they carry different data). While entering the “combineLatest” the carriages joined in one train and drove to the “subscribe” station.
The code below uses the “combineLatest” operator to combine the values of several streams in the simplest form.
const button = document.getElementById('clickElement');
const streamOne$ = fromEvent(button, 'click');
const streamTwo$ = fromEvent(button, 'click');
combineLatest([
streamOne$,
streamTwo$,
])
.subscribe((events) => {
console.log(events);
});
As a result, after subscribing and clicking on the button, an array of two combined streams is displayed in the console.
Hot and Cold Observables
You are going to face cold and hot observables quite often while using RxJS. Therefore, it is extremely important to understand the differences between these concepts, otherwise, you will not be able to understand RxJS.
First, let's find out what is cold observable on the example of our railway. For this we need to use the “interval” operator. As a parameter, the “interval” operator takes a number representing the milliseconds between data sending sessions. In our example, the “interval” operator will send numbers to the depot every second. Starting from 0, the numbers will increase (1,2,3 ..) with each subsequent iteration.
const data$ = interval(1000);
At this stage, we have an interesting situation. As described above, the “interval” operator wants to send data to the depot every second. However, “interval” cannot send the data to the depot, since the depot does not know where the carriages should go. That is why the data won’t be sent. To fix this, we need to do a subscription which means to add “subscribe”.
Once the depot finds out to which station to send the railway carriages, the “interval” begins to send data to the depot for sending it to the “subscribe” station.
At this point, we introduce “setTimeout”. The role of “setTimeout” is to inform the depot about the station (where it is necessary to send the railway carriages with data) after a certain period of time. The example below shows that “interval” will start sending data to the depot only 3 seconds after the application starts.
const data$ = interval(1000);
setTimeout(() => {
console.log('3 seconds passed after the application start');
data$.subscribe((count) => {
console.log('data arrived at the station 1 - ' + count);
});
}, 3000);
// With output to console:
// 3 seconds passed after the application start
// data arrived at the station 1 - 0
// data arrived at the station 1 - 1
// data arrived at the station 1 - 2
// data arrived at the station 1 - 3
// data arrived at the station 1 - 4
// data arrived at the station 1 - 5
// and so on...
Let’s add one more railroad station and take a look at the changes we will have:
const data$ = interval(1000);
setTimeout(() => {
console.log('3 seconds passed after the application start');
data$.subscribe((count) => {
console.log('data arrived at the station 1 - ' + count);
});
}, 3000);
setTimeout(() => {
console.log('5 seconds passed after the application start');
data$.subscribe((count) => {
console.log('data arrived at the station 2 - ' + count);
});
}, 5000);
// With output to console:
// 3 seconds passed after the application start
// data arrived at the station 1 - 0
// 5 seconds passed after the application start
// data arrived at the station 1 - 1
// data arrived at the station 1 - 2
// data arrived at the station 2 - 0
// data arrived at the station 1 - 3
// data arrived at the station 2 - 1
// data arrived at the station 1 - 4
// data arrived at the station 2 - 2
// data arrived at the station 1 - 5
// data arrived at the station 2 - 3
// and so on...
So after adding the second station we have:
“interval” starts sending data to the depot only 3 seconds after the application starts
“interval” starts sending data to the depot only 5 seconds after the application starts
If we look at the console, we will see that “interval” starts to generate data again (starting from 0) for the second station. This is the whole essence of cold observable. “Interval” generates data for each station (“subscribe”) separately and data generation begins only after the depot finds out about the station.
After we learn about the cold observable, let’s heat it up to get the hot observable. To do that we need two new railway junctions: “publish” and “connect”.
RxJS has an issue associated with TypeScript when we use the “publish” operator inside “pipe()”. This issue happens because “pipe()” always returns the type “Observable” and to call the “connect” method, the variable should be of type “ConnectableObservable”. In our case, to solve this issue we manually specify the return type as “ConnectableObservable
” after calling “pipe()”.
const data$ = interval(1000).pipe(
publish(),
) as ConnectableObservable<number>;
data$.connect();
setTimeout(() => {
console.log('3 seconds passed after the application start');
data$.subscribe((count) => {
console.log('data arrived at the station 1 - ' + count);
});
}, 3000);
setTimeout(() => {
console.log('5 seconds passed after the application start');
data$.subscribe((count) => {
console.log('data arrived at the station 2 - ' + count);
});
}, 5000);
// With output to console:
// 3 seconds passed after the application start
// data arrived at the station 1 - 3
// data arrived at the station 1 - 4
// 5 seconds passed after the application start
// data arrived at the station 1 - 5
// data arrived at the station 2 - 5
// data arrived at the station 1 - 6
// data arrived at the station 2 - 6
// data arrived at the station 1 - 7
// data arrived at the station 2 - 7
// and so on...
As you can see, there were 2 global changes:
The station received data starting from 3 and not from 0.
The same data arrived at both stations
After adding “publish” and “connect”, our railroad starts to behave quite differently. Now “interval” doesn’t pay attention if there are any stations or not. It starts to generate data only after “connect” makes a request to the depot. The depot sends data only to the “publish” railway junction, where they are waiting for further departure to the stations.
Let’s get back to our example and follow each railroad carriage.
-
“interval” waits until “connect” sends a command for data loading
const data$ = interval(1000)
-
Adding railway junction “publish”
const data$ = interval(1000).pipe(
publish(),
) as ConnectableObservable<number>;
-
“connect” sends a request to the depot that data can be sent
data$.connect();
-
“interval” generates 0 and it is sent to the railway junction “publish”
-
“interval” generates 1 and it goes to the railway junction “publish”
-
“interval” generates 2 and it goes to the railway junction “publish”
-
in 3 seconds after that application was started the 1st “subscribe” station appears
-
“interval” generates 3 and it goes to the railway junction “publish”
-
“publish” sees the new station and sends 3 to it
-
“interval” generates 4 and it goes to the railway junction “publish”
-
“publish” sends 4 to the first station
-
in 5 seconds after that application was started the 2nd “subscribe” station appears
-
“interval” generates 5 and it goes to the railway junction “publish”
-
“publish” now sees both stations and data will be sent to them simultaneously
-
“interval” generates 6 and it goes to the railway junction “publish”
-
“publish” sends 6 to both stations simultaneously
So, we can say that we have converted cold observable to hot observable. Now “interval” generates data constantly, it does not wait until some station makes a request for data. Such behavior characterizes hot observables.
Normally in a real-life project when you have a cold Observable and several stations (subscribers), you do not want new values to be generated for each of them instead of reusing existing ones. In this case, you should consider using “publish”.
API requests perfectly describe this situation:
const get$ = ajax('http://host/api/users');
get$.subscribe((res) => {
console.log('data arrived at the 1st station from the server');
});
get$.subscribe((res) => {
console.log('data arrived at the 2nd station from the server');
});
// With output to console:
// data arrived at the 1st station from the server
// data arrived at the 2nd station from the server
At first glance, everything works perfectly. Data comes from the server, loaded into trailers and sent to 2 stations. But at a more detailed examination, we will see that 2 requests are sent to the server, due to the fact that the data is generated for each station separately. To get rid of an unnecessary request, we must use “publish” (make observable hot).
const get$ = ajax('http://host/api/users').pipe(
publish(),
) as ConnectableObservable<number>;
get$.connect();
get$.subscribe((res) => {
console.log('data arrived at the 1st station from the server');
});
get$.subscribe((res) => {
console.log('data arrived at the 2nd station from the server');
});
// With output to console:
// executed server request
// 1st railroad station appeared
// data arrived at the 1st station from the server
// 2nd railroad station appeared
// data arrived at the 2nd station from the server
Now that our observable become hot, only one request is sent to the server after “connect” asks for data.
Surely, we considered only a small part of what you need in working with a real project. We did not cover such RxJS operators as publishBehavior (), publishLast (), publishReplay (), share (), refCount (), etc. But we discussed “publish” and “connect” in detail which are the basis for the mentioned operators. That will enable you to learn more about different operators and expand your knowledge.
Conclusion
This article was intended to provide a basic understanding of the Observable operators RxJS. It does not include many of the reactive programming fundamentals. However, the article may be a good start for those who did not have a chance to be familiar with RxJS operators. We hope, it will serve as a motivation to continue to expand your knowledge in this field. Thank you for your attention and see you soon!
In case you still have any questions, do not hesitate to contact our team and we will assist you in any inquiry.