RxJS has a long list of operators. Some of them are extremely powerful, but not so easy to learn from the first attempt: they allow you to make nested observables in your observables.
In every example of this article, we’ll use a “main observable” and will add nested observables.
Mapping
We’ll use an input field as the main observable — when the value of this input field changes, our main observable emits a new value:
const field = new FormControl<string>('');
const mainObservable = field.valueChanges;
Let’s skip the part when we are attaching this control to some form and template.
For now, we have mainObservable
that will emit new values every time field
is modified.
Let’s say that this input field is a search input, and we want to find items in our database.
Using examples and definitions from the previous article, how would you classify
mainObservable
in this example — hot or cold?
So we want to display a list of possible matches when a user is typing in our search field.
const listOfItems = mainObservable.pipe(
switchMap((fieldValue) => {
// fieldValue - value emitted by `mainObservable`
// we are using it to make a call
return this.dbService.findByName(fieldValue);
}),
tap((items) => console.log('Found:', items))
);
What happens here:
Every time when mainObservable
emits a new value (it happens when a user types something into our field), we receive it in switchMap()
operator and here we can handle the value — we can react to it. The result of our reaction should be an observable — this operator requires us to return an observable. In this pipe, after this switchMap()
operator, we will observe values, emitted not by mainObservable
, but by that observable that we returned in switchMap()
.
In this example, if we remove switchMap()
operator out of the pipe, the only operator will be tap()
— so every time we receive a new value from mainObservable
, we’ll get in the console:
Found: some value of the input field
It’s because the value goes straight to the tap()
without interruptions and modifications.
But if we restore our switchMap()
, then tap()
will receive the response of dbService.findByName()
.
When we are using switchMap()
, we are mapping every value to a new observable — that’s why switchMap()
requires us to return an observable.
I’m not using the word “switching” here intentionally: we are mapping every emitted value to a new observable, it’s an important thing to understand and remember.
const listOfItems = mainObservable.pipe(
tap((value) => console.log('Value from mainObservable...', value)),
// next operator means:
// map every new value from `mainObservable` to...
switchMap((value) => {
// ...to the observable we return
return this.dbService.findByName(fieldValue);
}),
tap((items) => console.log('...was mapped to:', items))
);
There are other mapping operators: concatMap()
, exhaustMap()
, mergeMap()
, and they are doing the same thing: they are mapping every new value of the mainObservable
to a new observable.
This similarity in their behavior will help you to understand their differences.
If you are not familiar with the term “mapping” (in programming), here is a short example:
const values = ['c', 'b', 'a'];
const items = {a: 'Apple', b: 'Banana', c: 'Carrot'};
const food = values.map(v => items[v]);
console.log(food);
// (3) ['Carrot', 'Banana', 'Apple']
Here every value of values
is mapped to some item and we use the results of this mapping.
In the same way, we are mapping every value, emitted by mainObservable
to a new observable. And for the next operator after switchMap()
, this new observable will become the source of values.
switchMap()
Our dbService.findByName()
is very slow — it takes 5 seconds to return a response.
Users can type much faster than 1 symbol per 5 seconds, so we can have a situation when we already received a new value in the input field, but we still didn’t receive a response from our dbService.findByName()
. What will happen?
When this operator receives a new value, it will immediately unsubscribe()
from any observable we returned (in this operator) previously, and will switch to a new observable that we return for this new value. In Angular, unsubscribe()
will cause a cancelation of the HTTP request.
Let’s look at the flow:
User types “b”;
In
switchMap()
we return an observable that sends a request to the API to find items by name, using this value, “b”:dbService.findByName("b")
;The user doesn’t wait and after just a second types another letter, “a”, so our field now has the value “ba”. Our
mainObservable
emits “ba”;switchMap()
unsubscribes from the observable we returned previously and demands a new one, for the value “ba”: we returndbService.findByName("ba")
;The user waits 5 seconds. Notice, that we’ve sent 2 requests already, but we don’t have to wait 10 seconds — the first request will be canceled as soon as a new value is emitted, so we’ll need to wait just for one request;
We finally got our list of items;
The user types “nana” and we send a new request — there is nothing to
unsubscribe()
this time forswitchMap()
because the previous request was completed before the user started typing again.
concatMap()
This operator also maps every value to a new observable.
The only difference — it will not stop the previous observable we returned from concatMap()
— it will wait until our previous observable is completed, then it will return a new observable for the new value.
Let’s replace switchMap()
in the previous example with concatMap()
and take a look at the flow:
User types “b”;
In
concatMap()
we returndbService.findByName("b")
;After 1 second, the user types “a” — our
mainObservable
emits “ba”;concatMap()
will wait for the observable mapped to “b”, then it will return the observable mapped to “ba”.After 5 seconds of waiting, the user gets the list of items found for “b”, and after another 5 seconds — the list of items found for “ba”.
Of course, it’s not the operator we want to use for “search” or “typeahead” functionality. But sometimes we do need to concatenate and not cancel requests, and we need them to be executed one after another, not in parallel — that’s when we should use concatMap()
.
exhaustMap()
This operator will not stop previously mapped observable, this operator will ignore any new values, emitted by mainObservable
, until the previously mapped observable is completed.
It is quite a useful operator for POST requests — when you are creating a new item, you’ve already sent all the data to your server in the first request, and you want to receive the response — if a user accidentally clicked “Create” button one more time (double click?), you don’t need to send a second request or cancel the existing one.
mergeMap()
This operator will map every new value, emitted by mainObservable
and will run them concurrently — a perfect fit for operations that can run in parallel (for example, deletion of items).
✨Bonus: debounceTime()
Some services are not as slow as our dbService.findByName()
— they can return a result for almost every letter the user types into our input field.
It would be a waste of resources to run search and return results just to instantly replace them with other results.
To avoid this, we can improve our code a little:
const listOfItems = mainObservable.pipe(
debounceTime(250),
// 👆
switchMap((fieldValue) => {
// fieldValue - value emitted by `mainObservable`
// we are using it to make a call
return this.dbService.findByName(fieldValue);
}),
tap((items) => console.log('Found:', items))
);
Here debounceTime(250)
will remember the new value and start counting 250ms. During this time, debounceTime()
will remember every new value received, but will not emit them. After 250ms, debounceTime()
will emit the last value received. So while the user is typing, this operator will collect values for 250ms, and only after that it will emit the last received value (and then it will collect emitted values for another 250ms).
subscribe()
Observables are just collections of functions that will do nothing until you call subscribe()
.
In our example, we are observing FormControl.valueChanges
— I’ve asked you to guess if it’s a hot or cold observable. It matters, because if it’s a cold observable, then we’ll receive the first value right after subscribe()
. But it would mean, that this observable is created for us and we have full control of every event.
It’s not the case with the input field — here users are the source of events, and we can’t know when they stop typing, so FormControl.valueChanges
is a hot observable. Therefore, we can’t expect that after subscribe()
we receive the first typed value — maybe it was typed before we subscribed. You should always think about that when dealing with hot observables. If you need the current value as the first value, you can add it:
const listOfItems = mainObservable.pipe(
startWith(field.value),
// 👆
debounceTime(250),
switchMap((fieldValue) => {
// fieldValue - value emitted by `mainObservable`
// we are using it to make a call
return this.dbService.findByName(fieldValue);
}),
tap((items) => console.log('Found:', items))
);
And now, in your template, you can use something like this:
<ul *ngIf="listOfItems | async as items">
<li *ngFor="let item of items">{{item}}</li>
</ul>
This way you don’t even need to call subscribe()
— async
pipe will do it for you, and you don’t need to care about unsubscribing — async
pipe will call unsubscribe()
when the component is removed from DOM.
In this article, you’ve learned some of the most powerful operators in RxJS — mapping operators, their similarities and differences.
💙 If you enjoy my articles, consider following me on Twitter, and/or subscribing to receive my new articles by email.
🎩️ If you or your company is looking for an Angular consultant, you can purchase my consultations on Upwork.