Asynchronous JavaScript Programming (Callback, Promise, RxJs)

Hello. In touch Omelnitsky Sergey. Not so long ago I led a reactive programming stream where I talked about asynchrony in JavaScript. Today I would like to outline this material.

But before we start the main material, we need to do an introductory one. So, let's start with the definitions: what is a stack and a queue?

A stack is a collection whose elements are received on the principle of "last in, first out" LIFO

A queue is a collection whose elements are received according to the principle (“first come in, first come out” FIFO

Ok, let's continue.

JavaScript is a single-threaded programming language. This means that it has only one thread of execution and one stack in which functions are queued for execution. Therefore, at one point in time, JavaScript can perform only one operation, while other operations will wait for their turn on the stack until they are called.

The call stack is a data structure that, in simple terms, records information about the place in the program where we are. If we go into a function, we put a record about it at the top of the stack. When we return from the function, we pull the topmost element from the stack and find ourselves where we called this function from. This is all the stack can do. And now an extremely interesting question. How then does asynchrony work in JavasScript?

In fact, in addition to the stack, browsers have a special queue for working with the so-called WebAPI. The functions from this queue will be executed in order only after the stack is completely cleared. Only after that they are pushed from the queue onto the stack for execution. If at least one element is currently on the stack, then they cannot get on the stack. It is precisely because of this that calling functions by timeout is often not accurate in time, since a function cannot go from the queue to the stack while it is full.

Consider the following example and do its step-by-step “execution”. Also see what happens in the system.

console.log('Hi'); setTimeout(function cb1() { console.log('cb1'); }, 5000); console.log('Bye'); 

1) So far, nothing is happening. The browser console is clean, the call stack is empty.

2) Then the console.log ('Hi') command is added to the call stack.

3) And it is executed

4) Then console.log ('Hi') is removed from the call stack.

5) Now go to the setTimeout command (function cb1 () {...}). It is added to the call stack.

6) The setTimeout (function cb1 () {...}) command is executed. The browser creates a timer that is part of the Web API. He will do the countdown.

7) The setTimeout (function cb1 () {...}) command has completed its work and is removed from the call stack.

8) The console.log ('Bye') command is added to the call stack.

9) The console.log ('Bye') command is executed.

10) The console.log ('Bye') command is removed from the call stack.

11) After at least 5000 ms have passed, the timer exits and places the cb1 callback in the callback queue.

12) The event loop takes c the cb1 function from the callback queue and places it on the call stack.

13) The cb1 function is executed and adds console.log ('cb1') to the call stack.

14) The console.log ('cb1') command is executed.

15) The console.log command ('cb1') is removed from the call stack.

16) The cb1 function is removed from the call stack.

Take a look at an example in dynamics:

Well, here we’ve examined how asynchrony is implemented in JavaScript. Now let's talk briefly about the evolution of asynchronous code.

The evolution of asynchronous code.

 a(function (resultsFromA) { b(resultsFromA, function (resultsFromB) { c(resultsFromB, function (resultsFromC) { d(resultsFromC, function (resultsFromD) { e(resultsFromD, function (resultsFromE) { f(resultsFromE, function (resultsFromF) { console.log(resultsFromF); }) }) }) }) }) }); 

Asynchronous programming, as we know it in JavaScript, can only be implemented with functions. They can be passed like any other variable to other functions. So the callbacks were born. And it's cool, fun and provocatively, until it turns into sadness, longing and sadness. Why? Yes, everything is simple:

With Promise, things got a little better.

 new Promise(function(resolve, reject) { setTimeout(() => resolve(1), 2000); }).then((result) => { alert(result); return result + 2; }).then((result) => { throw new Error('FAILED HERE'); alert(result); return result + 2; }).then((result) => { alert(result); return result + 2; }).catch((e) => { console.log('error: ', e); }); 

But promis has its limitations. For example, a promise, without dancing with a tambourine, cannot be undone, and most importantly, it works with one value.

Well, we smoothly approached reactive programming. Are you tired? Well, the good thing is you can go make some gulls, think over and go back to read further. And I will continue.

Reactive programming basics

Reactive programming is a programming paradigm focused on data flows and the spread of change. Let's take a closer look at what a data stream is.

 //     const input = ducument.querySelector('input'); const eventsArray = []; //      eventsArray input.addEventListener('keyup', event => eventsArray.push(event) ); 

Imagine that we have an input field. We create an array, and for each keyup of the input event we will save the event in our array. In this case, I would like to note that our array is sorted by time i.e. the index of later events is greater than the index of earlier ones. Such an array is a simplified data stream model, but it is not a stream yet. In order for this array to be safely called a stream, it must be able to somehow inform subscribers that it has received new data. So we come to the definition of flow.

Data stream

 const { interval } = Rx; const { take } = RxOperators; interval(1000).pipe( take(4) ) 

A stream is an array of data sorted by time that can indicate that the data has changed. Now imagine how convenient it is to write code in which you need to trigger several events in different parts of the code in one action. We just subscribe to the stream and he will let us know when the changes occur. And the RxJs library can do this.

RxJS is a library for working with asynchronous and event-based programs using observable sequences. The library provides the main type of Observable , several auxiliary types ( Observer, Schedulers, Subjects ) and operators of working with events as with collections ( map, filter, reduce, every and the like from JavaScript Array).

Let's look at the basic concepts of this library.

Observable, Observer, Producer

Observable is the first basic type we will look at. This class contains the bulk of the RxJs implementation. It is associated with an observable stream, which you can both subscribe to using the subscribe method.

Observable implements an auxiliary mechanism for creating updates, the so-called Observer . The value source for Observer is called Producer . It can be an array, an iterator, a web socket, some kind of event, etc. So we can say that observable is a conductor between Producer and Observer.

Observable handles three types of Observer events:

Let's see the demo:

At the beginning we will process the values ​​1, 2, 3, and after 1 sec. we will get 4 and end our flow.

Thinking out loud

And then I realized that telling was more interesting than writing about it. : D


When we subscribe to a stream, we create a new subscription class that allows us to unsubscribe using the unsubscribe method. We can also group subscriptions using the add method. Well, it’s logical that we can ungroup the threads using remove . The add and remove input methods accept another subscription. I would like to note that when we unsubscribe, we unsubscribe from all child subscriptions as if they called the unsubscribe method. Go ahead.

Types of streams

Producer created outside observableProducer created inside observable
Data is transferred at the time the observable is created.Data is reported at the time of subscription
Need more logic to unsubscribeThe thread terminates independently
Uses one-to-many communicationUses a one-to-one relationship
All subscriptions have the same value.Subscriptions are independent
Data can be lost if there is no subscriptionReissues all stream values ​​for a new subscription

To give an analogy, I would imagine a hot stream like a movie in a movie theater. At what point in time you came, from that moment and began viewing. I would compare a cold stream with a call in those. support. Any caller listens to the answering machine from start to finish, but you can hang up using unsubscribe.

I would like to note that there are still the so-called warm flows (such a definition I met extremely rarely and only in foreign communities) - this is a stream that transforms from a cold stream into a hot one. The question arises - where to use)) I will give an example from practice.

I work with an angular. He actively uses rxjs. To get data to the server, I expect a cold stream and I use this stream in the template using asyncPipe. If I use this pipe several times, then, returning to the definition of a cold stream, each pipe will request data from the server, which is strange to say the least. And if I convert a cold stream into a warm one, then the request will happen once.

In general, understanding the type of streams is difficult enough for beginners, but important.


 return this.http.get(`${environment.apiUrl}/${this.apiUrl}/trade_companies`) .pipe( tap(({ data }: TradeCompanyList) => this.companies$$.next(cloneDeep(data))), map(({ data }: TradeCompanyList) => data) ); 

Operators provide us with the ability to work with streams. They help control events that occur in the Observable. We will consider a couple of the most popular ones, and operators can be found in more detail using the links in useful information.

Operators - of

Let's start with the auxiliary operator of. It creates an Observable based on a simple value.

Operators - filter

The filter operator filter, as the name implies, filters the stream signal. If the operator returns true, then skips further.

Operators - take

take - Takes the value of the number of emits, after which the stream ends.

Operators - debounceTime

debounceTime - discards the emitted values ​​that fall into the specified period of time between the output data - after the lapse of the time interval it emits the last value.

 const { Observable } = Rx; const { debounceTime, take } = RxOperators; Observable.create((observer) => { let i = 1;; //     1000 setInterval(() => { }, 1000); //     1500 setInterval(() => { }, 1500); }).pipe( debounceTime(700), //  700     take(3) ); 

Operators - takeWhile

It emits values ​​until takeWhile returns false, after which it will unsubscribe from the stream.

 const { Observable } = Rx; const { debounceTime, takeWhile } = RxOperators; Observable.create((observer) => { let i = 1;; //     1000 setInterval(() => { }, 1000); }).pipe( takeWhile( producer => producer < 5 ) ); 

Operators - combineLatest

The combine combineLatest operator is somewhat similar to promise.all. It combines several threads into one. After each thread makes at least one emit, we get the last values ​​from each in the form of an array. Further, after any emit from the combined flows, it will give new values.

 const { combineLatest, Observable } = Rx; const { take } = RxOperators; const observer_1 = Observable.create((observer) => { let i = 1; //     1000 setInterval(() => {'a: ' + i++); }, 1000); }); const observer_2 = Observable.create((observer) => { let i = 1; //     750 setInterval(() => {'b: ' + i++); }, 750); }); combineLatest(observer_1, observer_2).pipe(take(5)); 

Operators - zip

Zip - waits for a value from each stream and forms an array based on these values. If the value does not come from any stream, then the group will not be formed.

 const { zip, Observable } = Rx; const { take } = RxOperators; const observer_1 = Observable.create((observer) => { let i = 1; //     1000 setInterval(() => {'a: ' + i++); }, 1000); }); const observer_2 = Observable.create((observer) => { let i = 1; //     750 setInterval(() => {'b: ' + i++); }, 750); }); const observer_3 = Observable.create((observer) => { let i = 1; //     500 setInterval(() => {'c: ' + i++); }, 500); }); zip(observer_1, observer_2, observer_3).pipe(take(5)); 

Operators - forkJoin

forkJoin also concatenates threads, but it only values ​​when all threads are complete.

 const { forkJoin, Observable } = Rx; const { take } = RxOperators; const observer_1 = Observable.create((observer) => { let i = 1; //     1000 setInterval(() => {'a: ' + i++); }, 1000); }).pipe(take(3)); const observer_2 = Observable.create((observer) => { let i = 1; //     750 setInterval(() => {'b: ' + i++); }, 750); }).pipe(take(5)); const observer_3 = Observable.create((observer) => { let i = 1; //     500 setInterval(() => {'c: ' + i++); }, 500); }).pipe(take(4)); forkJoin(observer_1, observer_2, observer_3); 

Operators - map

The map transformation operator converts the emit value to a new one.

 const { Observable } = Rx; const { take, map } = RxOperators; Observable.create((observer) => { let i = 1; //     1000 setInterval(() => {; }, 1000); }).pipe( map(x => x * 10), take(3) ); 

Operators - share, tap

The tap operator - allows you to do side effects, that is, any actions that do not affect the sequence.

The utility share operator can make it hot from a cold stream.

With the operators finished. Let's move on to Subject.

Thinking out loud

And then I went to drink some gulls. These examples bore me: D

Subject family

The subject family is a prime example of hot streams. These classes are a kind of hybrid, which act simultaneously as observable and observer. Since subject is a hot stream, you need to unsubscribe from it. If we talk about the main methods, then this:

Allocate four 5 types of subject.

Thinking out loud

He spoke on the stream 4, but it turned out they added one more. As the saying goes, live and learn.

Simple Subject new Subject() is the simplest kind of subject. It is created without parameters. Passes values ​​that came only after subscription.

BehaviorSubject new BehaviorSubject( defaultData<T> ) - in my opinion the most common type of subject. The input takes a default value. It always saves the data of the last emit, which it transfers when subscribing. This class also has a useful value method that returns the current value of the stream.

ReplaySubject new ReplaySubject(bufferSize?: number, windowTime?: number) - The input can optionally accept the first argument as the size of the buffer of values ​​that it will store in itself, and the second time during which we need changes.

AsyncSubject new AsyncSubject() - nothing happens when subscribing, and the value will be returned only when complete. Only the last stream value will be returned.

WebSocketSubject new WebSocketSubject(urlConfigOrSource: string | WebSocketSubjectConfig<T> | Observable<T>, destination?: Observer<T>) - The documentation is silent about it and I see it for the first time. Who knows what he is doing, write, supplement.

Fuf. Well, here we have considered everything that I wanted to tell today. I hope this information was helpful. You can read the list of literature yourself in the useful information tab.

Useful information


All Articles