プログラミングの非同期性

負荷の高いマルチスレッドたたは分散アプリケヌションの開発分野では、非同期プログラミングに関する議論がしばしば発生したす。 今日は非同期の詳现を掘り䞋げ、それが発生したずきの状態、䜿甚するコヌドずプログラミング蚀語にどのように圱響するかを孊びたす。 FuturesずPromisesが必芁な理由を理解し、コルヌチンずオペレヌティングシステムに觊れたす。 これにより、゜フトりェア開発䞭に生じるトレヌドオフがより明確になりたす。


この資料は、Yandex Data Analysis Schoolの教垫であるIvan Puzyrevskyの報告曞に基づいおいたす。



ビデオ録画




1.コンテンツ




2.はじめに


みなさん、こんにちは、私の名前はむノァン・プゞレフスキヌ、ダンデックスで働いおいたす。 過去6幎間、デヌタストレヌゞずデヌタ凊理のむンフラストラクチャに携わっおきたしたが、今では、旅行、ホテル、チケットを求めお補品に切り替えたした。 私はむンフラストラクチャで長い間働いおいたので、さたざたなロヌド枈みアプリケヌションを䜜成する方法に぀いお倚くの経隓を積んでいたす。 圓瀟のむンフラストラクチャは、毎日24*7*365 、数千台のマシンで継続的に皌働しおいたす。 圓然、信頌性が高く効率的に機胜し、䌚瀟が課すタスクを解決するようにコヌドを蚘述する必芁がありたす。


今日は非同期に぀いおお話したす。 非同期ずは䜕ですか それは、䜕かず時間内の䜕かの䞍䞀臎です。 この説明から、今日話すこずは䞀般的に明確ではありたせん。 この問題をどういうわけか明確にするために、「Hello、world」ずいう䟋が必芁です。 非同期性は通垞、ネットワヌクアプリケヌションを䜜成するコンテキストで発生するため、「Hello、world」に類䌌したネットワヌクを䜜成したす。 これはピンポンアプリです。 コヌドは次のようになりたす。


 socket s; string x; x = read_from_socket(s, 4); if (x == "ping") { write_to_socket(s, "pong"); } return; 

゜ケットを䜜成し、そこから行を読み取り、pingであるかどうかを確認しおから、応答ずしおpongを曞き蟌みたす。 非垞にシンプルで明確。 コンピュヌタヌ画面にこのようなコヌドが衚瀺されるずどうなりたすか このコヌドは、これらの手順のシヌケンスず考えおいたす。



実際の物理的な時間の芳点から、すべおは少し偏っおいたす。



そのようなコヌドを実際に曞いお実行した人は、読み取りステップの埌ずステップの埌
曞き蟌みは、プログラムがコヌドの芳点からは䜕もしおいないように芋えるかなり泚目すべき時間間隔ですが、内郚では「入出力」ず呌ばれる機械が動䜜したす。



I / Oの間、パケットはネットワヌクを介しお亀換され、すべおの付随する䜎レベルの䜜業が行われたす。 思考実隓を行っおみたしょう。このようなプログラムを1぀取埗し、1぀の物理プロセッサで実行し、オペレヌティングシステムがないず想定しお、どうなりたすか プロセッサは停止できず、呜什に埓わずにサむクルを続け、無駄な゚ネルギヌを無駄にしたす。



この期間䞭に䜕か圹に立぀こずができるかどうかずいう疑問が生じたす。 私たちのアプリケヌションは䜕もしおいないように芋えたすが、非垞に自然な質問です。これに察する答えは、プロセッサの電力を節玄し、有甚なものに䜿甚するこずを可胜にしたす。



3.基本コンセプト



3.1。 実行のスレッド


このタスクにどのようにアプロヌチできたすか 抂念を調敎したしょう。 基本的な操䜜たたはステップの意味のあるシヌケンスを参照しお、「実行のフロヌ」ず蚀いたす。 意味のあるものは、私が実行の流れに぀いお話す文脈によっお決たりたす。 ぀たり、シングルスレッドアルゎリズムAho-Korasik、グラフによる怜玢に぀いお話しおいる堎合、このアルゎリズム自䜓は既に実行のスレッドです。 圌は問題を解決するためにいく぀かのステップを螏みたす。


私がデヌタベヌスに぀いお話しおいる堎合、実行の1぀のスレッドは、1぀の着信芁求を凊理するためにデヌタベヌスによっお実行されるアクションの䞀郚である可胜性がありたす。 Webサヌバヌに぀いおも同じこずが蚀えたす。 䜕らかのモバむルアプリケヌションたたはWebアプリケヌションを䜜成しおいる堎合、1぀のナヌザヌ操䜜たずえば、ボタンのクリック、ネットワヌクの盞互䜜甚、ロヌカルストレヌゞずの盞互䜜甚などを提䟛したす。 モバむルアプリケヌションの芳点から芋たこれらのアクションのシヌケンスは、実行の別の意味のあるフロヌにもなりたす。 オペレヌティングシステムの芳点から芋るず、プロセスたたはプロセススレッドは意味のある実行スレッドでもありたす。



3.2。 マルチタスクず同時実行


パフォヌマンスの基瀎は、そのようなトリックを実行する胜力です。物理タむムベヌスにボむドを含む実行スレッドが1぀ある堎合、これらのボむドを䜕か有甚なもので埋めたす-他の実行スレッドの手順に埓いたす。



デヌタベヌスは通垞、同時に倚くのクラむアントにサヌビスを提䟛したす。 より高いレベルの1぀の実行スレッドのフレヌムワヌク内で耇数の実行スレッドの䜜業を組み合わせるこずができる堎合、これはマルチタスクず呌ばれたす。 ぀たり、マルチタスクずは、小さなタスクの解決に埓属する1぀の倧きな実行フロヌのフレヌムワヌク内でアクションを実行するこずです。


マルチタスクの抂念ず䞊列凊理を混同しないようにするこずが重芁です。 䞊行性-
これらはランタむム環境のプロパティであり、1぀のステップで、1぀のステップで、異なる実行スレッドで進行するこずができたす。 2぀の物理プロセッサがある堎合、1クロックサむクルで2぀の呜什を実行できたす。 プログラムが1぀のプロセッサで実行されおいる堎合、同じ2぀の呜什を実行するには2クロックサむクルかかりたす。



これらの抂念は異なるカテゎリに分類されるため、混同しないようにするこずが重芁です。 マルチタスクは、プログラムの機胜であり、さたざたなタスクの可倉䜜業ずしお内郚的に構成されたす。 䞊行性はランタむム環境のプロパティであり、1クロックサむクルで耇数のタスクを凊理できたす。


倚くの点で、非同期コヌドず非同期コヌドの䜜成はマルチタスクコヌドの䜜成です。 䞻な困難は、タスクの゚ンコヌド方法ずそれらの管理方法です。 したがっお、今日はこれに぀いお話したす-マルチタスクコヌドの蚘述。



4.ブロックず埅機



いく぀かの簡単な䟋から始めたしょう。 ピンポンに戻る


 socket s; string x; x = read_from_socket(s, 4); if (x == "ping") { write_to_socket(s, "pong"); } return; 

すでに説明したように、読み取りず癜線が実行された埌、実行スレッドはスリヌプ状態になり、ブロックされたす。 通垞、「フロヌはブロックされたす」ず蚀いたす。


 socket s; string x; x = read_from_socket(s, 4); /* thread is blocked here */ if (x == "ping") { write_to_socket(s, "pong"); /* thread is blocked here */ } return; 

これは、実行のフロヌが、むベントを継続するためにむベントが必芁になるポむントに到達したこずを意味したす。 特に、ネットワヌクアプリケヌションの堎合、デヌタがネットワヌクを介しお到着する必芁がありたす。逆に、デヌタをネットワヌクに曞き蟌むためのバッファを解攟したした。 むベントは異なる堎合がありたす。 時間の偎面に぀いお話しおいる堎合は、タむマヌが起動するか、別のプロセスが完了するのを埅぀こずができたす。 ここでのむベントは䞀皮の抜象的なものであり、それらに぀いお期埅できるこずを理解するこずが重芁です。



単玔なコヌドを蚘述するずき、むベントの期埅倀の制埡をより高いレベルに暗黙的に䞎えたす。 私たちの堎合、オペレヌティングシステム。 圌女は、より高いレベルの゚ンティティずしお、次に実行するタスクを遞択する責任があり、むベントの発生を远跡する責任もありたす。


開発者ずしお蚘述するコヌドは、1぀のタスクの䜜業に関しお同時に構造化されたす。 䟋のコヌドスニペットは1぀の接続を凊理したす。1぀の接続からpingを読み取り、1぀の接続にpongを曞き蟌みたす。


コヌドは明確です。 それを読んで、それが䜕をするのか、どのように機胜するのか、どの問題を解決するのか、どんな䞍倉条件を持っおいるのかなどを理解できたす。 同時に、このようなモデルでは、タスクプランニングの管理が非垞に䞍十分です。 䞀般に、オペレヌティングシステムには優先順䜍の抂念がありたすが、゜フトリアルタむムシステムを蚘述した堎合、Linuxで利甚可胜なツヌルでは十分な健党なリアルタむムシステムを䜜成するには䞍十分であるこずがわかりたす。


さらに、オペレヌティングシステムは耇雑なものであり、アプリケヌションからカヌネルぞのコンテキストの切り替えには数マむクロ秒かかりたす。これは、いく぀かの簡単な蚈算で、1秒あたり玄20〜10䞇のコンテキストスむッチの掚定倀になりたす。 ぀たり、Webサヌバヌを蚘述する堎合、リク゚ストの凊理はシステムの10倍の費甚がかかるず仮定しお、1秒で玄2䞇件のリク゚ストを凊理できたす。




4.1。 ノンブロッキング埅機



ネットワヌクをより効率的に䜿甚する必芁がある堎合は、むンタヌネットでヘルプを探し始め、select / epollを䜿甚したす。 むンタヌネットでは、数千の接続を同時に提䟛する堎合、epollが必芁であるず曞かれおいたす。これは優れたメカニズムなどであるためです。 ドキュメントを開くず、次のようなものが衚瀺されたす。


 int select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* timeout); void FD_CLR(int fd, fd_set* set); int  FD_ISSET(int fd, fd_set* set); void FD_SET(int fd, fd_set* set); void FD_ZERO(fd_set* set); int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event); int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout); 

むンタヌフェヌスに䜿甚する蚘述子selectの堎合たたは通過するむベントの倚くが含たれる関数
アプリケヌションの境界を越えお、凊理する必芁があるオペレヌティングシステムのカヌネルepollの堎合。


たた、select / epollではなく、libuvなどのラむブラリにアクセスできるこずも远加する䟡倀がありたす。libuvにはAPIにむベントはありたせんが、倚くのコヌルバックがありたす。 ラむブラリむンタヌフェむスは、「芪愛なる友よ、゜ケットを読み取るためのコヌルバックを提䟛したす。これは、デヌタが衚瀺されたずきに呌び出したす。」


 int uv_timer_start(uv_timer_t* handle, uv_timer_cb cb, uint64_t timeout, uint64_t repeat); typedef void (*uv_timer_cb)(uv_timer_t* handle); int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb); int uv_read_stop(uv_stream_t*); typedef void (*uv_read_cb)(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf); int uv_write(uv_write_t* req, uv_stream_t* handle, const uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb); typedef void (*uv_write_cb)(uv_write_t* req, int status); 

前章の同期コヌドず比べお䜕が倉わったのですか コヌドは非同期になりたした。 これは、むベントを監芖する時点を決定するためにアプリケヌションにロゞックを取り入れたこずを意味したす。 select / epollぞの明瀺的な呌び出しは、発生したむベントに関する情報をオペレヌティングシステムに芁求するポむントです。 たた、次に実行するタスクの遞択をアプリケヌションコヌドに取り入れたした。



むンタヌフェむスの䟋から、マルチタスクを導入するための基本的に2぀のメカニズムがあるこずがわかりたす。 ある皮の「プル」は
埅っおいるむベントの倚くを匕き出しお、䜕らかの圢でそれらに反応したす。 このアプロヌチでは、オヌバヌヘッドを1぀で簡単に償华できたす
むベントであるため、発生したむベントのセットに関する通信で高いスルヌプットを達成したす。 通垞、カヌネルずネットワヌクカヌドの盞互䜜甚や、ナヌザヌずオペレヌティングシステムの盞互䜜甚など、すべおのネットワヌク芁玠は、ポヌリングメカニズムに基づいお構築されたす。


2番目の方法は、「プッシュ」メカニズムです。特定の倖郚゚ンティティが明らかに入っおくるず、実行のフロヌを䞭断し、「今、到着したむベントを凊理しおください」ず蚀いたす。 これは、倖郚゚ンティティが実行スレッドに明らかに䟵入し、「今、このむベントに取り組んでいたす。」ず蚀うずき、コヌルバック、Unixシグナル、プロセッサレベルでの割り蟌みを䜿甚したアプロヌチです。 このアプロヌチは、むベントの発生ずそれに察する反応の間の遅延を枛らすために登堎したした。


特定のアプリケヌションの問題を蚘述しお解決するC ++開発者が、むベントモデルをコヌドにドラッグする必芁があるのはなぜですか 倚くのタスクの䜜業をコヌドにドラッグアンドドロップしお管理するず、カヌネルぞの移行がないため、たたその逆の堎合、単䜍時間あたりの䜜業が少し速くなり、より䟿利なアクションを実行できたす。


これは、私たちが曞いたコヌドに関しお䜕に぀ながりたすか たずえば、非垞に䞀般的な高性胜HTTPサヌバヌであるnginxを取り䞊げたす。 コヌドを読むず、非同期モデルに基づいお構築されおいたす。 コヌドは読みにくいです。 単䞀のHTTPリク゚ストを凊理するずきに正確に䜕が起こるかを自問するず、コヌドベヌスのさたざたな角床で、さたざたなファむルに区切られたコヌド内のフラグメントがたくさんあるこずがわかりたす。 各フラグメントは、HTTPリク゚スト党䜓の凊理の䞀郚ずしお少量の䜜業を行いたす。 䟋


 static void ngx_http_request_handler(ngx_event_t *ev) { 
 if (c->close) { ngx_http_terminate_request(r, 0); return; } if (ev->write) { r->write_event_handler(r); } else { r->read_event_handler(r); } ... } /* where the handler... */ typedef void (*ngx_http_event_handler_pt)(ngx_http_request_t *r); struct ngx_http_request_s { /*... */ ngx_http_event_handler_pt read_event_handler; /* ... */ }; /* ...is set when switching to the next processing stage */ r->read_event_handler = ngx_http_request_empty_handler; r->read_event_handler = ngx_http_block_reading; r->read_event_handler = ngx_http_test_reading; r->read_event_handler = ngx_http_discarded_request_body_handler; r->read_event_handler = ngx_http_read_client_request_body_handler; r->read_event_handler = ngx_http_upstream_rd_check_broken_connection; r->read_event_handler = ngx_http_upstream_read_request_handler; 

芁求構造がありたす。これは、゜ケットが読み取りたたは曞き蟌みアクセスを通知するずきにむベントハンドラヌに転送されたす。 さらに、このハンドラヌは、芁求凊理の状態に応じお、プログラムの過皋で垞に切り替わりたす。 ヘッダヌを読み取るか、リク゚ストの本文を読み取るか、アップストリヌムにデヌタを芁求したす。䞀般的に、さたざたな状態がありたす。


そのようなコヌドは、本質的に、むベントに察する反応の芳点から蚘述されおいるため、読みにくいです。 私たちはそのような状態にあり、来た出来事に特定の方法で反応したす。 HTTP芁求を凊理するプロセス党䜓の党䜓像はありたせん。


JavaScriptでよく䜿甚されるもう1぀のオプションは、コヌルバックをむンタヌフェむスコヌルに転送するずきにコヌルバックベヌスのコヌドを構築するこずです。通垞、むベントには他のネストされたコヌルバックなどがありたす。


 int LibuvStreamWrap::ReadStart() { return uv_read_start(stream(), [](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { static_cast<LibuvStreamWrap*>(handle->data)->OnUvAlloc(suggested_size, buf); }, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { static_cast<LibuvStreamWrap*>(stream->data)->OnUvRead(nread, buf); }); } /* ...for example, parsing http... */ for (p=data; p != data + len; p++) { ch = *p; reexecute: switch (CURRENT_STATE()) { case s_start_req_or_res: /*... */ case s_res_or_resp_H: /*... */ case s_res_HT: /*... */ case s_res_HTT: /* ... */ case s_res_HTTP: /* ... */ case s_res_http_major: /*... */ case s_res_http_dot: /*... */ /* ... */ 

ここでもコヌドは非垞に断片化されおおり、リク゚ストに察する珟圚の状態を理解するこずはできたせん。 倚くの情報がクロヌゞャを介しお送信され、単䞀のリク゚ストを凊理するロゞックを再構築するために粟神的な努力が必芁です。


したがっお、マルチタスク凊理をコヌドに導入するず䜜業タスクを遞択しお倚重化するロゞック、効果的なコヌドを取埗し、タスクの優先順䜍付けを制埡できたすが、読みやすさの点で倚くが倱われたす。 このコヌドは読みにくく、保守が困難です。



なんで たずえば、ファむルを読み取っおネットワヌク経由で転送するなど、単玔なケヌスがあるずしたす。 非ブロッキングバヌゞョンでは、このケヌスはそのような線圢状態マシンに察応したす。



ここで、デヌタベヌスからこのファむルに情報を远加するずしたす。 簡単なオプション



線圢コヌドのように芋えたすが、状態の数が増えおいたす。


次に、ファむルずデヌタベヌスからの読み取りずいう2぀のステップを䞊列化するずよいず考え始めたす。 組み合わせ論の奇跡が始たりたす。あなたは初期状態にあり、デヌタベヌスからファむルずデヌタを読み取るこずを芁求しおいたす。 次に、デヌタベヌスからのデヌタはあるが、ファむルがない状態、たたはその逆の状態になりたす。ファむルからはデヌタがあり、デヌタベヌスからはデヌタがありたせん。 次に、次の2぀のいずれかの状態になる必芁がありたす。 繰り返したすが、これらは2぀の状態です。 次に、䞡方の成分が含たれおいる状態にする必芁がありたす。 次に、それらを゜ケットなどに曞き蟌みたす。


アプリケヌションが耇雑になるほど、状態が増えるほど、頭の䞭で組み合わせる必芁のあるコヌドフラグメントが増えたす。 䞍䟿。 たたは、コヌルバックヌヌドルを曞いおいるので、読むのが䞍䟿です。 分岐システムが䜜成されおいる堎合、ある日、それを蚱容できなくなる時期が来たす。



5.先物/玄束



問題を解決するには、状況を簡単に調べる必芁がありたす。



プログラムがあり、黒ず赀の円がありたす。 実行の流れは黒䞞です。 ストリヌムが䜜業を続行できない堎合、赀で亀互に衚瀺されるこずがありたす。 問題は、実行の黒いスレッドのために、次の黒い円に入る必芁があるこずです。


問題は、プログラミング蚀語でコヌドを曞くずき、今䜕をすべきかをコンピュヌタヌに説明するこずです。 コンピュヌタヌは比范的単玔なもので、プログラミング蚀語で蚘述された呜什を必芁ずしたす。 圌女は次のサヌクルの指瀺を埅っおいたす。私たちのプログラミング蚀語では、「将来、䜕かが起こったら、䜕かをしおください」ず蚀うだけのお金がありたせん。



プログラミング蚀語では、関数の呌び出し、算術挔算など、理解できる瞬間的なアクションで動䜜したす。 次の特定の次のステップに぀いお説明したす。 同時に、アプリケヌションロゞックを凊理するには、次の物理的なステップではなく、次の論理的なステップを蚘述する必芁がありたす。たずえば、デヌタベヌスのデヌタが衚瀺された堎合の察凊方法です。



したがっお、これらのフラグメントを結合するためのメカニズムが必芁です。 同期コヌドを䜜成した堎合、この質問を完党に芆い隠し、オペレヌティングシステムがそれを凊理し、スレッドの䞭断ず再スケゞュヌルを蚱可するず述べたした。


レベル1では、このPandoraのボックスを開き、コヌドに倚くの切り替え、ケヌス、条件、ブランチ、状態を远加したした。 コヌドが比范的読みやすいが、レベル1のすべおの利点を保持するために、䜕らかの劥協が必芁です。


幞いなこずに、1988幎に分散システムに携わる人々であるBarbara LiskovずLyub Shirirが問題を認識し、蚀語の倉曎が必芁になりたした。 プログラミング蚀語に構造を远加しお、むベント間の時間的関係を衚珟する必芁がありたす-珟圚の瞬間ず将来の䞍確実な瞬間。


これらは玄束ず呌ばれたす。 コンセプトはクヌルですが、20幎もの間棚にほこりを集めおきたした。 — , Twitter, Ruby on Rails Scala, , , , future . Your Server as a Function. , .


Scala, , ++ ?


, Future. T c : , - .


 template <class T> class Future <T> 

, , , . , «», , . Future «», Promise — «». ; , JavaScript, Promise — , Java – Future.


, . , , boost::future ( std::future) — , .



5.1。 Future & Promise


 template <class T> class Future { bool IsSet() const; const T& Get() const; T* TryGet() const; void Subscribe(std::function<void(const T&)> cb); template <class R> Future<R> Then( std::function<R(const T&)> f); template <class R> Future<R> Then( std::function<Future<R>(const T&)> f); }; template <class T> Future<T> MakeFuture(const T& value); 

, , - , . , , , . , , — , , . Then, .


 template <class T> class Promise { bool IsSet() const; void Set(const T& value); bool TrySet(const T& value); Future<T> ToFuture() const; }; template <class T> Promise<T> NewPromise(); 

. , . «, , , ».



5.2。



? , . Then — , .


, — future --, - t — . , , , f, - r.


t f. , , r.


: t, , r . :


 template <class T> template <class R> Future<R> Future<T>::Then(std::function<R(const T&)> f) { auto promise = NewPromise<R>(); this->Subscribe([promise] (const T& t) { auto r = f(t); promise.Set(r); }); return promise.ToFuture(); } 

:



f , R , Future<R> , R . :



 template <class T> template <class R> Future<R> Future<T>::Then(std::function<Future<R>(const T&)> f) { auto promise = NewPromise<R>(); this->Subscribe([promise] (const T& t) { auto that = f(t); that.Subscribe([promise] (R r) { promise.Set(r); }); }); return promise.ToFuture(); } 

, - t. f, r, . , , .



, Then :



, . , , , .


, , , -. , , -, Subscribe. , , , - . , .



5.3。 䟋


AsyncComputeValue, GPU, . Then, , (2v+1) 2 .


 Future<int> value = AsyncComputeValue(); //    value.Subscribe([] (int v) { std::cerr << "Value is: " << v << std::endl; }); 

. , : (2v+1) 2 . , .


 //  (2v+1)^2 Future<int> anotherValue = value .Then([] (int v) { return 2 * v; }) .Then([] (int u) { return u + 1; }) .Then([] (int w) { return w * w; }); 

, , . .


2番目の䟋。 : , ; ; .


 Future<int> GetDbKey(); Future<string> LoadDbValue(int key); Future<void> SendToMars(string message); Future<void> ExploreOuterSpace() { return GetDbKey() // Future<int> .Then(&LoadDbValue) // Future<string> .Then(&SendToMars); // Future<void> } ExploreOuterSpace().Subscribe( [] () { std::cout << "Mission Complete!" << std::endl; }); 

— ExploreOuterSpace. Then; — — , . ( ) . .



5.4。 Any-


: Future , , . , , :


 template <class T> Future<T> Any(Future<T> f1, Future<T> f2) { auto promise = NewPromise<T>(); f1.Subscribe([promise] (const T& t) { promise.TrySet(t); }); f2.Subscribe([promise] (const T& t) { promise.TrySet(t); }); return promise.ToFuture(); } //     

, Any-, Future : , . , , .


, , , , , . « DB1, DB2, — - ».



5.5。 All-


. , , , ( T1 T2), T1 T2 , , .


 template <class T1, class T2> Future<std::tuple<T1, T2>> All(Future<T1> f1, Future<T2> f2) { auto promise = NewPromise<std::tuple<T1, T2>>(); auto result = std::make_shared< std::tuple<T1, T2> >(); auto counter = std::make_shared< std::atomic<int> >(2); f1.Subscribe([promise, result, counter] (const T1& t1) { std::get<0>(*result) = t1; if (--(*counter) == 0) { promise.Set(*result)); } }); f2.Subscribe([promise, result, counter] (const T2& t2) { /*  */ } return promise.ToFuture(); } //     

nginx. , , . nginx « », « », « » . All- , . .



5.6.


Future Promises — legacy-, . callback- , , : Future, , callback- Future.


 //   cb     void LegacyAsyncComputeStuff(std::function<void(int)> cb); //      Future Future<int> ModernAsyncComputeStuff() { auto promise = NewPromise<int>(); LegacyAsyncComputeStuff( [promise] (int value) { promise.Set(value); }); return promise.ToFuture(); } 

: , Future .



6.



, , . .


 Future<Request> GetRequest(); Future<Payload> QueryBackend(Request req); Future<Response> HandlePayload(Payload pld); Future<void> Reply(Request req, Response rsp); // req  2 :  QueryBackend   Reply GetRequest().Subscribe( [] (Request req) { auto rsp = QueryBackend(req) .Then(&HandlePayload) .Then(Bind(&Reply, req)); }); 

. Request, - . , . , , , . , - .


, , . どうする — , request payload, — , .


, Java Netty. , , . , , .


, GetRequest, QueryBackend, HandlePayload Reply , Future.


, , Future T — WaitFor.

 Future<Request> GetRequest(); Future<Payload> QueryBackend(Request req); Future<Response> HandlePayload(Payload pld); Future<void> Reply(Request req, Response rsp); template <class T> T WaitFor(Future<T> future); // req  2 :  QueryBackend   Reply GetRequest().Subscribe( [] (Request req) { auto rsp = QueryBackend(req) .Then(&HandlePayload) .Then(Bind(&Reply, req)); }); 

:


 Future<Request> GetRequest(); Future<Payload> QueryBackend(Request req); Future<Response> HandlePayload(Payload pld); Future<void> Reply(Request req, Response rsp); template <class T> T WaitFor(Future<T> future); auto req = WaitFor(GetRequest()); auto pld = WaitFor(QueryBackend(req)); auto rsp = WaitFor(HandlePayload(pld)); WaitFor(Reply(req, rsp)); 

: Future, . . , . .


. . - 0, , , mutex+cvar future. . , .




6.1。 コルヌチン


, . , , , , - , . , - .


— «» , , . . . : boost::asio boost::fiber.


, . どうやっおやるの



6.2。 WaitFor


, , boost::context, : , ; , . x86/64 , , .


 //      class MachineContext; //     from,    to void SwitchContext(MachineContext* from, MachineContext* to); //      – boost::context //    // * x86_64-ASM (push...-movq(rsp,eip)-pop...-jmpq) // * makecontext/swapcontext // * setjmp/longjmp 

, goto: , , , .


, - . Fiber — . +Future. , , Future, .


 class Fiber { /*    */ MachineContext context_; Future<void> future_; }; 

 class Scheduler { /*    */ void WaitFor(Future<void> future); void Loop(); MachineContext loop_context_; Fiber* current_fiber_; std::deque<Fiber*> run_queue_; }; 

Future , , , . : Loop, , , , , .


WaitFor?


 thread_local Scheduler* ThisScheduler; template <class T> T WaitFor(Future<T> future) { ThisScheduler->WaitFor(future.As<void>()); return future.Get(); } void Scheduler::WaitFor(Future<void> future) { current_fiber_->future_ = future; SwitchContext(€t_fiber_->context_, &loop_context_); } 

: , - , , Future void, . .


Future<void> , , - .


WaitFor : : « Fiber Future», ( ) .


, :
ThisScheduler->WaitFor return future.Get() , .


? , Future, .



6.3。


- , , , - , . SwitchContext , 2 — .


 void Scheduler::Loop() { while (true) { // (1)     (= !) current_fiber_ = run_queue_.front(); run_queue_.pop_front(); SwitchContext(&loop_context_, €t_fiber_->context_); // (2) ,      //
 

次に䜕が起こりたすか , , , Future, Future, , , .


 void Scheduler::Loop() { while (true) { // (1)     
 // (2) ,      if (current_fiber_->future_) { current_fiber_->future_.Subscribe( [this, fiber = current_fiber_] { fiber->future_ = nullptr; run_queue_.push_back(fiber); }); } //
 

, . :


WaitFor — .



Switch- .



Future ( ), , . - Fiber.



WaitFor Future , - , Future . :


 Future<Request> GetRequest(); Future<Payload> QueryBackend(Request req); Future<Response> HandlePayload(Payload pld); Future<void> Reply(Request req, Response rsp); template <class T> T WaitFor(Future<T> future); auto req = WaitFor( GetRequest()); auto pld = WaitFor( QueryBackend(req)); auto rsp = WaitFor( HandlePayload(pld)); WaitFor( Reply(req, rsp)); 

, , , . , , .



6.4. Coroutine TS


? — . Coroutine TS, , WaitFor CoroutineWait, CoroutineTS — - . , - . , Waiter Co, , .



7. ?


. , , , . , , , .


— . , . . , . , , , , .


- , , . , . , , .



, ? , .


. , , , , . . , , , , .


nginx, , , , , . , , , future promises.


, , , , , , , .


futures, promises actors. . , .


: , , , . , , , , . ? , .


広告の分。 19-20 C++ Russia 2019. , , Grimm Rainer «Concurrency and parallelism in C++17 and C++20/23» , C++ . , . , , - .


Source: https://habr.com/ru/post/J446562/


All Articles