非同期2:ポータルを介したテレポート



非同期性に関する記事の続きにたどり着くまで、1年も経っていませんでした。 この記事では、非同期に関する最初の記事[1]のアイデアを発展させています。 かなり複雑なタスクについて説明します。その例は、さまざまな非自明なシナリオでコルーチンを使用する力と柔軟性を明らかにします。 結論として、競合状態(競合状態)の2つの問題と、小さいながら非常に快適なボーナスを検討します。

この間ずっと、 最初の記事はすでに検索トップになっています。



さあ、行こう!



挑戦する


元の文言は簡単で、次のように聞こえます。
ネットワークを介して重いオブジェクトを取得し、UIに転送します。

UIに「興味深い」要件を追加して、タスクを複雑にします。
  1. アクションは、イベントを介してUIスレッドから生成されます。
  2. 結果はUIに返される必要があります。
  3. UIをブロックしたくないので、操作は非同期で実行する必要があります。

オブジェクトを受け取るための「面白い」条件を追加します。
  1. ネットワーク操作は遅いため、オブジェクトをキャッシュします。
  2. 再起動後にオブジェクトが保存されるように、永続的なキャッシュが必要です。
  3. 永続的なデバイスは低速であるため、オブジェクトをより速く返すために、オブジェクトをメモリに追加でキャッシュします。

パフォーマンスの側面を見てみましょう。
  1. キャッシュ(永続的なストレージとメモリ)に、並列ではあるが連続したレコードはないことが望ましいでしょう。
  2. キャッシュからの読み取りも並行処理する必要がありますが、一方のキャッシュで値が見つかった場合は、他方のキャッシュからの応答を待たずにすぐに使用します。
  3. ネットワーク操作がキャッシュに干渉しないようにする必要があります。つまり、たとえばキャッシュが鈍い場合、ネットワークの相互作用に影響を与えません。
  4. 限られた数のスレッドで多数の接続をサポートしたいと考えています。つまり、リソースに対してより慎重な態度をとるために非同期ネットワーク相互作用が必要です。

ロジックを悪化させるには:
  1. 操作をキャンセルする必要があります。
  2. さらに、ネットワークを介してオブジェクトを受信した場合、キャッシュを更新するための後続の操作にキャンセルを適用しないでください。つまり、アクションのセットに対して「キャンセルのキャンセル」を実装する必要があります。

筋金入りではないと考えた場合は、さらに要件を追加します。
  1. 操作のタイムアウトを実装する必要があります。 さらに、タイムアウトは、操作全体と一部の両方のタイムアウトにする必要があります。 例:
    • すべてのネットワーク相互作用のタイムアウト:接続、要求、応答。
    • ネットワークの相互作用やキャッシュの操作を含む、操作全体のタイムアウト。
  2. 操作スケジューラは、独自のものでも外部のものでもかまいません(たとえば、UIスレッドのスケジューラ)。
  3. スレッドをブロックする操作はありません。 これは、ミューテックスやその他の同期手段がスレッドをブロックするため、使用が禁止されていることを意味します。



これで十分です。 誰かがこれを行う方法についてすぐに答えを得た場合、私はこの決定に精通して喜んでいるでしょう。 さて、以下に私の解決策を提案します:その中で、たとえばキャッシュと永続性の実装ではなく、ロックとスケジューラの要件を考慮した特定の並列および非同期の相互作用に重点が置かれることは明らかです。

解決策


このソリューションでは、次のモデルを使用します。



何が起こっているかの本質を説明します。
  1. UIMem CacheDisk CacheNetworkは、新しく作成されたHandler対応する操作を実行するオブジェクトです。
  2. Handlerは単純なシーケンスを実行します。
    • 並行して、 Mem CacheおよびDisk CacheオブジェクトのMem Cacheからデータを取得する操作を開始します。 成功した場合、つまり、少なくとも1つのキャッシュから見つかった結果で応答を受信すると、すぐに結果を返します。 (図のように)障害が発生した場合、実行は継続されます。
    • 両方のキャッシュからの結果の欠如を待った後、 HandlerNetworkを呼び出してNetwork経由でオブジェクトを取得します。 これを行うには、サービスにconnectconnect )、要求をsendsend )、応答を受信( receive )します。 このような操作は非同期で実行され、他のネットワークの相互作用をブロックしません。
    • Networkコンポーネントから受信したオブジェクトは、両方のキャッシュに並行して書き込まれます。
    • キャッシュへの書き込みの完了を待った後、値はUIストリームに返されます。
  3. このプログラムには、次のプランナーとそれに関連するオブジェクトが含まれています。
    • 非同期Handler操作を開始し、結果が返されるUIスレッド。
    • Mem CacheDisk Cacheなど、すべての基本的な操作が実行される共通のスレッドプール。
    • Networkのネットワークスレッドプール。 メインプールの負荷がネットワークスレッドプールに影響しないように、メインスレッドプールとは別に作成されます。

前に書いたように、非同期の面ではこれは実際には重要ではないため、最も簡単な方法でオブジェクトを実装します。

 // stub:   struct DiskCache { boost::optional<std::string> get(const std::string& key) { JLOG("get: " << key); return boost::optional<std::string>(); } void set(const std::string& key, const std::string& val) { JLOG("set: " << key << ";" << val); } }; //   : - struct MemCache { boost::optional<std::string> get(const std::string& key) { auto it = map.find(key); return it == map.end() ? boost::optional<std::string>() : boost::optional<std::string>(it->second); } void set(const std::string& key, const std::string& val) { map[key] = val; } private: std::unordered_map<std::string, std::string> map; }; struct Network { // ... //     std::string get(const std::string& key) { net::Socket socket; JLOG("connecting"); socket.connect(address, port); //   -   Buffer sz(1, char(key.size())); socket.write(sz); //  -  socket.write(key); //    socket.read(sz); Buffer val(size_t(sz[0]), 0); //    socket.read(val); JLOG("val received"); return val; } private: std::string address; int port; // ... }; // UI-:   UI struct UI : IScheduler { void schedule(Handler handler) { //    UI- // ... } void handleResult(const std::string& key, const std::string& val) { TLOG("UI result inside UI thread: " << key << ";" << val); // TODO: add some actions } }; 

原則として、すべてのUIフレームワークには、UIスレッドで必要なアクションを実行できるメソッドが含まれています(たとえば、Android: Activity.runOnUiThread 、Ultimate ++: PostCallback 、Qt:シグナルスロットメカニズムを使用)。 これらのメソッドは、 UI::scheduleメソッドの実装で使用する必要があります。

経済全体の初期化は、命令的なスタイルで行われます。

 //       ThreadPool cpu(3, "cpu"); //       ThreadPool net(2, "net"); //       Alone diskStorage(cpu, "disk storage"); //       Alone memStorage(cpu, "mem storage"); //     scheduler<DefaultTag>().attach(cpu); //       service<NetworkTag>().attach(net); //       service<TimeoutTag>().attach(cpu); //       portal<DiskCache>().attach(diskStorage); //       portal<MemCache>().attach(memStorage); //       portal<Network>().attach(net); UI& ui = single<UI>(); //  UI-  UI- portal<UI>().attach(ui); 

ユーザーアクションのUIスレッドでは、次のことを実行します。

 go([key] { // timeout   : 1=1000  Timeout t(1000); std::string val; //      boost::optional<std::string> result = goAnyResult<std::string>({ [&key] { return portal<DiskCache>()->get(key); }, [&key] { return portal<MemCache>()->get(key); } }); if (result) { //   val = std::move(*result); JLOG("cache val: " << val); } else { //     //     { //    : 0.5=500  Timeout tNet(500); val = portal<Network>()->get(key); } JLOG("net val: " << val); //         //  ( )  EventsGuard guard; //      goWait({ [&key, &val] { portal<DiskCache>()->set(key, val); }, [&key, &val] { portal<MemCache>()->set(key, val); } }); JLOG("cache updated"); } //   UI    portal<UI>()->handleResult(key, val); }); 



使用されるプリミティブの実装


注意深い読者が指摘したように、私はかなりの数のプリミティブを使用しましたが、その実装は推測することしかできません。 したがって、以下はアプローチと使用されるクラスの説明です。 これにより、ポータルとは何か、それらの使用方法が明確になり、テレポーテーションに関する質問にも答えられると思います。



保留中のプリミティブ


最も単純な待機プリミティブから始めましょう。

goWait:非同期操作を開始し、完了を待つ


したがって、シードのために、操作を非同期的に開始し、その完了を待つ関数を実装します。

 void goWait(Handler); 

もちろん、実装として、現在のコルーチンでハンドラーを実行することは非常に適しています。 しかし、より複雑なシナリオではこれは適切ではないため、この機能を実装するには、新しいコルーチンを作成します。

 void goWait(Handler handler) { deferProceed([&handler](Handler proceed) { go([proceed, &handler] { //    handler(); proceed(); //    }); }); } 

ここで何が起こっているのか簡単に説明します。 goWait関数の入力に、新しいコルーチンで起動されるハンドラーを取得します。 必要な操作を実行するには、次のように実装されているdeferProceed関数を使用します。

 typedef std::function<void(Handler)> ProceedHandler; void deferProceed(ProceedHandler proceed) { auto& coro = currentCoro(); defer([&coro, proceed] { proceed([&coro] { coro.resume(); }); }); } 

この機能は何をしますか? 実際には、より便利な使用のために延期呼び出しをラップします( deferdefer 、なぜ使用する必要があるのか​​については以前の記事で説明しています )、つまり、 HandlerではなくProceedHandler 、その中にHandlerが入力パラメーターとして渡され、コルーチンの実行を継続します。 実際、 proceedはオブジェクト内の現在のコルーチンへのリンクを保存し、 coro.resume()を呼び出します。 したがって、すべての作業をコルーチンでカプセル化し、ユーザーはproceed handlerのみで作業する必要があります。

goWait関数にgoWaitます。 したがって、 deferProceed呼び出されると、 deferProceedありproceed 。これは、 handlerの操作の最後に呼び出す必要があります。 私たちがやるべきことは、新しいコルーチンを作成し、その中でハンドラーを実行し、それが完了した後、すぐにproceedを呼び出して、内部でcoro.resume()を呼び出し、元のコルーチンの実行を継続することです。

これにより、フローをブロックせずに待機できますgoWait呼び出し中、現在のコルーチンで操作を一時停止し、転送されたハンドラーが終了すると、何も起こらなかったように実行を継続します。

goWait:いくつかの非同期操作を開始し、それらが完了するのを待ちます


ここで、非同期操作の全体を開始し、それらが完了するのを待つ関数を実装します。

 void goWait(std::initializer_list<Handler> handlers); 

入力時に、非同期で実行する必要があるハンドラーのリストが与えられます。つまり、各ハンドラーはそのコルーチンで開始されます。 前の関数との大きな違いは、すべてのハンドラーが完了した後にのみ、元のコルーチンの実行を継続する必要があることです。 それらのいくつかは、あらゆる種類のミューテックスと条件変数を使用します(実際に実装するものもあります!)が、これを行うことはできません(要件を参照)。したがって、他の実装方法を探します。

アイデアは実際には非常に簡単です。特定の値に達すると、 proceedを呼び出すカウンターを開始する必要がありproceed 。 各ハンドラーは、完了するとカウンターを更新するため、最後のハンドラーは元のコルーチンを実行し続けます。 ただし、1つの小さな問題があります。カウンターを実行中のコルーチンに分割する必要があり、最後のハンドラーはcontinueを呼び出すだけでなく、このカウンターをメモリから削除する必要があります。 これはすべて次のように実装できます。

 void goWait(std::initializer_list<Handler> handlers) { deferProceed([&handlers](Handler proceed) { std::shared_ptr<void> proceeder(nullptr, [proceed](void*) { proceed(); }); for (const auto& handler: handlers) { go([proceeder, &handler] { handler(); }); } }); } 

最初に、古きdeferProceedを起動しますが、少し魔法がその中に隠されています。 shared_ptr設計するとき、データへのポインタだけでなく、オブジェクトを削除するdeleterdeleterことができ、 delete ptrではなくハンドラーを呼び出すことを知っている人はほとんどいません。 実際には、最後に元のコルーチンを継続するために、proceed呼び出しを行います。 この場合、オブジェクト「 nullptr 」を配置するため、オブジェクト自体を削除する必要はありません。 その後、すべてが簡単です。ループ内ですべてのハンドラーを調べ、作成したコルーチンで実行します。 ここにも微妙な違いがあります。 shared_ptrの値を取得します。これにより、コピーが行われ、 shared_ptr内のアトミックリンクカウンターが増加します。 handler終了すると、キャプチャーされたプロシーダーを持つラムダが削除され、カウンターが減少します。 カウンターをゼロに減らして、 deleterオブジェクトを削除する最後の1つは、共有shared_ptr deleterを呼び出します。 deleter 、最終的にcoro.proceed()



明確にするために、以下は異なるスレッドで2つのハンドラーを開始する例を使用した一連の操作です。



例:再帰的並列フィボナッチ数


使用法を説明するために、次の例を検討してください。 気まぐれを見つけ、フィボナッチ数列を再帰的かつ並行して数えたいとします。 問題ありません:

 int fibo (int v) { if (v < 2) return v; int v1, v2; goWait({ [v, &v1] { v1 = fibo(v-1); }, [v, &v2] { v2 = fibo(v-2); } }); return v1 + v2; } 

スタックオーバーフローが発生することは決してないことに注意してくださいfibo関数の各呼び出しは、独自のコルーチンで発生します。

ウェイター:いくつかの非同期操作を開始し、その完了を待ちます


多くの場合、固定されたハンドラのセットを待つだけでなく、物事の間に何か有用なことをしてからだけ待つ必要があります。 必要なハンドラーの数さえわからない場合もあります。つまり、操作中にハンドラーを作成します。 実際、全体としてプロセッサのグループで動作する必要があります。 これを行うには、次のインターフェイスでWaiterプリミティブを使用できます。

 struct Waiter { Waiter& go(Handler); void wait(); }; 

次の2つの方法しかありません。
  1. go:別のハンドラーを実行します。
  2. wait:実行中のすべてのハンドラーを待ちます。

Waiterオブジェクトの存続期間中に、上記のメソッドを数回実行できます。

実装の考え方はまったく同じです。コルーチンの作業を続行するプロシーダーが必要です。 ただし、少し微妙な点が追加されています。プロシーダーは、実行中のコルーチンとWaiterオブジェクトに分割されています。 したがって、 waitメソッドの呼び出し時に、 Waiter自体のコピーをWaiterする必要があります。 方法は次のとおりです。

 void Waiter::wait() { if (proceeder.unique()) { //  Waiter  proceeder => JLOG("everything done, nothing to do"); return; } defer([this] { //  proceeder     auto toDestroy = std::move(proceeder); //  proceeder   , //   -   }); // proceeder     , //       init0(); } 

繰り返しますが、何もする必要はありません! このshared_ptrありがとう。 アーメン!



例:再帰的並列フィボナッチ数


素材を統合するために、 Waiterを使用した気まぐれの代替実装を検討します。

 int fibo (int v) { if (v < 2) return v; int v1; Waiter w; w.go([v, &v1] { v1 = fibo(v-1); }); int v2 = fibo(v-2); w.wait(); return v1 + v2; } 

別のオプション:

 int fibo (int v) { if (v < 2) return v; int v1, v2; Waiter() .go([v, &v1] { v1 = fibo (v-1); }) .go([v, &v2] { v2 = fibo (v-2); }) .wait(); return v1 + v2; } 

選びたくありません。

goAnyWait:いくつかの非同期操作を開始し、少なくとも1つが完了するのを待ちます


引き続き複数の操作を同時に実行します。 ただし、少なくとも1つの操作が完了するまで正確に期待します。

 size_t goAnyWait(std::initializer_list<Handler> handlers); 

入力では、ハンドラーのリスト、出力では最初に終了したハンドラーの番号が与えられます。

このプリミティブを実装するために、アプローチを少し近代化します。 ここでvoid* ptr == nullptrなく、特定のアトミックカウンターcounter分離しvoid* ptr == nullptr 。 最初は0初期化され0 。 作業が終了すると、各ハンドラーはカウンターをインクリメントします。 そして、値が0から1変わったことが突然判明した場合、彼と彼だけがproceed()を呼び出しました:

 size_t goAnyWait(std::initializer_list<Handler> handlers) { VERIFY(handlers.size() >= 1, "Handlers amount must be positive"); size_t index = static_cast<size_t>(-1); deferProceed([&handlers, &index](Handler proceed) { std::shared_ptr<std::atomic<int>> counter = std::make_shared<std::atomic<int>>(); size_t i = 0; for (const auto& handler: handlers) { go([counter, proceed, &handler, i, &index] { handler(); if (++ *counter == 1) { // , ! index = i; proceed(); } }); ++ i; } }); VERIFY(index < handlers.size(), "Incorrect index returned"); return index; } 

ご想像のとおり、このトリックは、2、3、またはそれ以上のプロセッサを待機する必要がある場合にも使用できます。

goAnyResult:いくつかの非同期操作を開始し、少なくとも1つの結果を待つ


それでは、実際に私たちのタスクに必要な最もおいしいものに移りましょう。 つまり、いくつかの操作を開始し、必要な結果を待つためです。 さらに、ハンドラーは結果を返さない場合があります。 つまり、彼は仕事を終えますが、同時に「まあ、できませんでした」と言うでしょう。

このアプローチでは、追加の複雑さが現れます。 結局のところ、すべてのハンドラーが作業を完了できますが、結果は得られません。 したがって、まず、すべての操作の最後に、目的の結果が得られたかどうかを確認し、次に「空の」結果を返す必要があります。 voidを通知するには、 boost::optional<T_result>を使用しますが、 goAnyResultこのような単純なプロトタイプで取得されます。

 template<typename T_result> boost::optional<T_result> goAnyResult( std::initializer_list< std::function< boost::optional<T_result>() > > handlers) 

ここにはひどいものは何もありません。オプションでT_result返すハンドラーのリストを渡すだけです。 つまり、ハンドラーには署名が必要です。

 boost::optional<T_result> handler(); 

前のプリミティブと比較した状況はわずかに変更されています。 , counter , 1 , «» , . , counter Counter :

 template<typename T_result> boost::optional<T_result> goAnyResult( std::initializer_list< std::function< boost::optional<T_result>() > > handlers) { typedef boost::optional<T_result> Result; typedef std::function<void(Result&&)> ResultHandler; struct Counter { Counter(ResultHandler proceed_) : proceed(std::move(proceed_)) {} ~Counter() { tryProceed(Result()); //    - } void tryProceed(Result&& result) { if (++ counter == 1) proceed(std::move(result)); } private: std::atomic<int> counter; ResultHandler proceed; }; Result result; deferProceed([&handlers, &result](Handler proceed) { std::shared_ptr<Counter> counter = std::make_shared<Counter>( [&result, proceed](Result&& res) { result = std::move(res); proceed(); } ); for (const auto& handler: handlers) { go([counter, &handler] { Result result = handler(); if (result) //       counter->tryProceed(std::move(result)); }); } }); return result; } 

, std::move , tryProceed . , std::move , - . cast- .

, .

, ,



, , , .
:

 struct IScheduler : IObject { virtual void schedule(Handler handler) = 0; }; 

– . , , , . , (. [2] : , , , UI-).




, :

 typedef boost::asio::io_service IoService; struct IService : IObject { virtual IoService& ioService() = 0; }; struct ThreadPool : IScheduler, IService { ThreadPool(size_t threadCount); void schedule(Handler handler) { service.post(std::move(handler)); } private: IoService& ioService(); std::unique_ptr<boost::asio::io_service::work> work; boost::asio::io_service service; std::vector<std::thread> threads; }; 

?
  1. , .
  2. boost::asio::io_service::post .
  3. work , io_service , .
  4. .

, ( ) IService ioService , IoService , boost::asio::io_service . , , .

, . boost::asio::io_service . , , - boost::asio::io_service . , IService , . . , , , ThreadPool IService , . . ThreadPool , , .


. , , Journey ( , ):

 struct Journey { void proceed(); Handler proceedHandler(); void defer(Handler handler); void deferProceed(ProceedHandler proceed); static void create(Handler handler, mt::IScheduler& s); private: Journey(mt::IScheduler& s); struct CoroGuard { CoroGuard(Journey& j_) : j(j_) { j.onEnter0(); } ~CoroGuard() { j.onExit0(); } coro::Coro* operator->() { return &j.coro; } private: Journey& j; }; void start0(Handler handler); void schedule0(Handler handler); CoroGuard guardedCoro0(); void proceed0(); void onEnter0(); void onExit0(); mt::IScheduler* sched; coro::Coro coro; Handler deferHandler; }; 

?

, , :

 void Journey::schedule0(Handler handler) { VERIFY(sched != nullptr, "Scheduler must be set in journey"); sched->schedule(std::move(handler)); } void Journey::proceed0() { //      guardedCoro0()->resume(); } Journey::CoroGuard Journey::guardedCoro0() { return CoroGuard(*this); } //          void Journey::proceed() { schedule0([this] { proceed0(); }); } //   ,     Handler Journey::proceedHandler() { return [this] { proceed(); }; } //    // .   1 void Journey::start0(Handler handler) { schedule0([handler, this] { //    guardedCoro0()->start([handler] { JLOG("started"); //     try { handler(); } catch (std::exception& e) { (void) e; JLOG("exception in coro: " << e.what()); } JLOG("ended"); }); }); } 

defer:

 void Journey::defer(Handler handler) { //   deferHandler = handler; //      coro::yield(); } // deferProceed,   void Journey::deferProceed(ProceedHandler proceed) { defer([this, proceed] { proceed(proceedHandler()); }); } 

すべてがシンプルです! , deferHandler .

 TLS Journey* t_journey = nullptr; void Journey::onEnter0() { t_journey = this; } // .   2 void Journey::onExit0() { if (deferHandler == nullptr) { //   =>  ,   delete this; } else { //       deferHandler(); deferHandler = nullptr; } //  ,       t_journey = nullptr; } 

create :

 void Journey::create(Handler handler, mt::IScheduler& s) { (new Journey(s))->start0(std::move(handler)); } 

, Journey , , . , …


! … , . , . !

:

 void Journey::teleport(mt::IScheduler& s) { if (&s == sched) { JLOG("the same destination, skipping teleport <-> " << s.name()); return; } JLOG("teleport " << sched->name() << " -> " << s.name()); sched = &s; defer(proceedHandler()); } 

:
  1. , , . , , .
  2. , : defer , . , .



Scheduler / Thread Scheduler2 / Thread2 :



これにより何が得られますか? , , , . , UI- , UI, , :

 auto result = someCalculations(); teleport(uiScheduler); showResult(result); teleport(calcScheduler); auto newResult = continueSmartCalculations(result); teleport(uiScheduler); updateResult(newResult); //… 

, UI, , UI-. , , , – , , .





. , . , UI- UI-, . , , , .

 struct Portal { Portal(mt::IScheduler& destination) : source(journey().scheduler()) { JLOG("creating portal " << source.name() << " <=> " << destination.name()); teleport(destination); } ~Portal() { teleport(source); } private: mt::IScheduler& source; }; 

( ), . .

RAII- , , (, UI- ), .

例を見てみましょう:

 ThreadPool tp1(1, "tp1"); ThreadPool tp2(1, "tp2"); go([&tp2] { Portal p(tp2); JLOG("throwing exception"); throw std::runtime_error("exception occur"); }, tp1); 

tp1, tp2. , , tp1 , . !



(, , ), :

 struct Scheduler { Scheduler(); void attach(mt::IScheduler& s) { scheduler = &s; } void detach() { scheduler = nullptr; } operator mt::IScheduler&() const { VERIFY(scheduler != nullptr, "Scheduler is not attached"); return *scheduler; } private: mt::IScheduler* scheduler; }; struct DefaultTag; template<typename T_tag> Scheduler& scheduler() { return single<Scheduler, T_tag>(); } template<typename T> struct WithPortal : Scheduler { struct Access : Portal { Access(Scheduler& s) : Portal(s) {} T* operator->() { return &single<T>(); } }; Access operator->() { return *this; } }; template<typename T> WithPortal<T>& portal() { return single<WithPortal<T>>(); } 

, :

 ThreadPool tp1(1, "tp1"); ThreadPool tp2(1, "tp2"); struct X { void op() {} }; portal<X>().attach(tp2); go([] { portal<X>()->op(); }, tp1); 

X tp2. , X ( return &single<T>() ) . Journey -, !



, . . . , .


. : .

, - ? , . .

? , , . « », – .

. , . :

 struct Alone : mt::IScheduler { Alone(mt::IService& service); void schedule(Handler handler) { strand.post(std::move(handler)); } private: boost::asio::io_service::strand strand; }; 

Alone IService , io_service::strand boost.asio . boost.asio , , . , (mutual exclusion).

Alone , , .

:

 struct MemCache { boost::optional<std::string> get(const std::string& key); void set(const std::string& key, const std::string& val); }; //  ThreadPool common_pool(3); //    Alone mem_alone(common_pool); //     portal<MemCache>().Attach(mem_alone); //     //     auto value = portal<MemCache>()->get(key); //  portal<MemCache>()->set(anotherKey, anotherValue); 

, . , !




, . , . , , .



? , , , (. «»). .

: – . , , , , . ! , , , ?

. , , , - , - , , . .

:

 enum EventStatus { ES_NORMAL, ES_CANCELLED, ES_TIMEDOUT, }; struct EventException : std::runtime_error { EventException(EventStatus s); EventStatus status(); private: EventStatus st; }; 

(. ) , :

 struct Goer { Goer(); EventStatus reset(); bool cancel(); bool timedout(); private: struct State { State() : status(ES_NORMAL) {} EventStatus status; }; bool setStatus0(EventStatus s); State& state0(); std::shared_ptr<State> state; }; 

: , .

- Journey :

 void Journey::handleEvents() { //      if (!eventsAllowed || std::uncaught_exception()) return; auto s = gr.reset(); if (s == ES_NORMAL) return; //   throw EventException(s); } void Journey::disableEvents() { handleEvents(); eventsAllowed = false; } void Journey::enableEvents() { eventsAllowed = true; handleEvents(); } 

, . - , . :

 struct EventsGuard { EventsGuard(); //  disableEvents() ~EventsGuard(); //  enableEvents() }; 

, handleEvents ? :

 void Journey::defer(Handler handler) { //      handleEvents(); deferHandler = handler; coro::yield(); //     handleEvents(); } 

, . - , handleEvents . .

:

 Goer go(Handler handler, mt::IScheduler& scheduler) { return Journey::create(std::move(handler), scheduler); } 

Journey::create Goer :

 struct Journey { // … Goer goer() const { return gr; } // … private: // … Goer gr; }; Goer Journey::create(Handler handler, mt::IScheduler& s) { return (new Journey(s))->start0(std::move(handler)); } // .  1 Goer Journey::start0(Handler handler) { // … return goer(); }   : Goer op = go(myMegaHandler); // … If (weDontNeedMegaHandlerAnymore) op.cancel(); 

op.cancel() , handleEvents() .



, , , Journey , -, , go . , , . : go , defer , deferProceed . ., Journey , TLS.


:

 struct Timeout { Timeout(int ms); ~Timeout(); private: boost::asio::deadline_timer timer; }; 


boost::asio::deadline_timer :

 Timeout::Timeout(int ms) : timer(service<TimeoutTag>(), boost::posix_time::milliseconds(ms)) { //     Goer goer = journey().goer(); //    timer.async_wait([goer](const Error& error) mutable { // mutable,       goer if (!error) //     ,   goer.timedout(); }); } Timeout::~Timeout() { //    timer.cancel_one(); // ,    handleEvents(); } 

RAII-, , .

:

 //   Timeout t(100); // 100  for (auto element: container) { performOperation(element); handleEvents(); } 

100 – !



:

 //   200     Timeout outer(200); portal<MyObject>()->performOp(); { //   100  //       Timeout inner(100); portal<MyAnotherObject>()->performAnotherOp(); //       EventsGuard guard; performGuardedAction(); } 

タスク


. , – «» . .

? :
  1. , .
  2. . , .
  3. – . , .

1


, 1.

:

 Goer Journey::start0(Handler handler) { schedule0([handler, this] { guardedCoro0()->start([handler] { JLOG("started"); try { handler(); } catch (std::exception& e) { (void) e; JLOG("exception in coro: " << e.what()); } JLOG("ended"); }); }); return goer(); } 

. どこ? , ?

答え
 Goer Journey::start0(Handler handler) { + Goer gr = goer(); schedule0([handler, this] { guardedCoro0()->start([handler] { JLOG("started"); @@ -121,7 +122,7 @@ JLOG("ended"); }); }); - return goer(); + return gr; } 





2


. :

 void Journey::onExit0() { if (deferHandler == nullptr) { delete this; } else { deferHandler(); deferHandler = nullptr; } t_journey = nullptr; } 

?

答え
  { @@ -153,8 +154,8 @@ - deferHandler(); - deferHandler = nullptr; + Handler handler = std::move(deferHandler); + handler(); } 




defer handler, .


: (GC)





, , GC . :

 struct A { ~A() { TLOG("~A"); } }; struct B:A { ~B() { TLOG("~B"); } }; struct C { ~C() { TLOG("~C"); } }; ThreadPool tp(1, "tp"); go([] { A* a = gcnew<B>(); C* c = gcnew<C>(); }, tp); 

:
 tp#1: ~C tp#1: ~B tp#1: ~A 

! - , .

, , :

 template<typename T, typename... V> T* gcnew(V&&... v) { return gc().add(new T(std::forward(v)...)); } GC& gc() { return journey().gc; } struct GC { ~GC() { //     for (auto& deleter: boost::adaptors::reverse(deleters)) deleter(); } template<typename T> T* add(T* t) { //    T deleters.emplace_back([t] { delete t; }); return t; } private: std::vector<Handler> deleters; }; 

GC Journey , . : , .

結論


, :
  1. /.
  2. .
  3. .
  4. , .
  5. – : , , . .

, . , . , .

, . : , , , UI, , . , .

. ! , .



コード
github.com/gridem/Synca
bitbucket.org/gridem/synca

C++ Party, Yandex
tech.yandex.ru/events/cpp-party/march-msk/talks/1761

C++ User Group
youtu.be/uUQX5QS1CCg
habrahabr.ru/post/212793

文学
[1] : habrahabr.ru/post/201826
[2] Akka- doc.akka.io/docs/akka/2.1.4/scala/scheduler.html

Source: https://habr.com/ru/post/J240525/


All Articles