メールボックスは、メールボックスではありません...

SObjectizerに関する最初の記事が2016年の夏に作成されたとき、興味のある読者が「 裏側 」を見ることができるように、時間の経過とともにその実装の詳細についても話します。 本日の記事では、SObjectizerの内臓について説明します。 mbox-s(「メールボックス」)のメカニズムについて。これは、アクター(用語ではエージェント)の相互作用を整理するために使用されます。

なぜmboxが特別なのですか?


なぜなら、このメカニズムがSObjectizerの学習を引き受ける人に対して非常によく似た質問をいくつ提起するのか、私たち自身が驚いているからです。 SObjectizerの開発者である私たちによく知られ、理解でき、馴染みのあるものは、初心者にとって決してそうではないことが判明しました。 そうだとすれば、mboxが何であり、どのように機能するかを理解してみましょう。 そして同時に、独自のmboxを作成してみてください。

なぜmboxが必要なのですか?


エージェント間の相互作用を整理するには、SObjectizerのメールボックスが必要です。 エージェント間の通信は非同期メッセージによって構築され、これらの同じメッセージをどこかに送信する必要があります。 「どこに正確に?」という疑問が生じます。

従来のアクターモデルでは、メッセージの受信者は受信者アクターです。 すなわち アクターAがアクターBにメッセージを送信するには、アクターAがアクターBへのリンクを持っている必要があります。受信者アクターへの参照はありません-メッセージを送信する方法はありません。 1:Nメーリングを実行する必要がある場合、送信者にはすべての受信者へのリンクが必要です。 これは、古典的な俳優のモデルについて話す場合です。

しかし、我々は異なる特異性を持っていました(通常、ビートは意識を決定し、解決しなければならないタスクと自由に使えるツールのニーズに導かれました)。

まず、C ++があります。 エージェントBへのリンクをエージェントAに渡さないというだけです。 これが通常のリンク(または通常のベアポインター)である場合、エージェントBが破棄されると、エージェントAはBへの「ダングリング」リンクを持ちます。したがって、通常のリンク/ポインターの代わりに、スマートリンク/ポインターを使用する必要があります。 しかし、単純なスマートポインターは良くありません。 エージェントAはエージェントBへのスマートポインターを持っている限り、エージェントBは削除されません(つまり、所有しているリソースは解放されません)。

したがって、C ++では、スマートポインターだけでなく、いくつかの特別なスマートプロキシリンクを使用する必要があります。 エージェントAはBへのプロキシリンクを持っている場合がありますが、Aがまだプロキシリンクを持っている場合でも、Bを安全に削除できます。 さらに、Aは存在しないエージェントBにメッセージを送信しようとする場合があり、この試みは壊滅的な結果(他の誰かのメモリを破損したり、C ++でぶら下がっているリンクにアクセスするときのようにアプリケーション全体がクラッシュするなど)につながることはありません。

次に、1対Nの相互作用が非常に一般的でした。 さらに、最初は一般的にエージェントが相互作用する唯一の方法でした。 したがって、エージェントAから情報を受信する必要があるエージェントBおよびCが、最初に自分自身へのリンクをエージェントAに送信することを強制されたくありませんでした。そして、そのエージェントAは、モードでAからのメッセージを受信したいエージェントのリストを個別に維持する必要がありました1:N

その結果、「メールボックス」の概念がありました。これは、a)エージェントが互いに通信するために使用できる非常にスマートなプロキシリンクであり、b)モード1での相互作用を簡素化するメカニズムであるためだけに作成されました:N

mbox-sが存在する場合、エージェントはメッセージを互いに直接送信するのではなく、メールボックス(mbox-s)で送信します。 mboxに送信されたメッセージは、このmboxからのメッセージを購読しているエージェントに配信されます。

したがって、エージェントAがエージェントBにメッセージを送信するには、両方のエージェントが認識しているmboxが必要です。 エージェントAはこのmboxにメッセージを送信し、エージェントBはこのmboxからのメッセージをサブスクライブします。 この小さな例でわかるように:

#include <so_5/all.hpp> class A final : public so_5::agent_t { const so_5::mbox_t to_; public: A(context_t ctx, so_5::mbox_t to) : so_5::agent_t{std::move(ctx)}, to_{std::move(to)} {} virtual void so_evt_start() override { //    B. so_5::send<std::string>(to_, "Hello!"); } }; class B final : public so_5::agent_t { public: B(context_t ctx, const so_5::mbox_t & from) : so_5::agent_t{std::move(ctx)} { //       from. so_subscribe(from).event(&B::on_string); } private: void on_string(mhood_t<std::string> cmd) { std::cout << "Message: " << *cmd << std::endl; //   . so_deregister_agent_coop_normally(); } }; int main() { so_5::launch([](so_5::environment_t & env) { //      ,   //  mbox  . env.introduce_coop([&](so_5::coop_t & coop) { //  mbox,     //  A  B. const auto mbox = env.create_mbox(); //    ,    //  mbox  . coop.make_agent<A>(mbox); coop.make_agent<B>(mbox); }); }); return 0; } 

さらに、1:Nモードでメッセージを送信、受信することは、1:1モードでメッセージを送信/受信することと変わりません。 上記の例は、エージェントAがエージェントBとCに同時にメッセージを送信する場合をどのように探すかを示しています。

 #include <so_5/all.hpp> class A final : public so_5::agent_t { const so_5::mbox_t to_; public: A(context_t ctx, so_5::mbox_t to) : so_5::agent_t{std::move(ctx)}, to_{std::move(to)} {} virtual void so_evt_start() override { //    B. so_5::send<std::string>(to_, "Hello!"); } }; class B final : public so_5::agent_t { public: B(context_t ctx, const so_5::mbox_t & from) : so_5::agent_t{std::move(ctx)} { //       from. so_subscribe(from).event(&B::on_string); } private: void on_string(mhood_t<std::string> cmd) { std::cout << "(B) Message: " << *cmd << std::endl; //   . so_deregister_agent_coop_normally(); } }; class C final : public so_5::agent_t { public: C(context_t ctx, const so_5::mbox_t & from) : so_5::agent_t{std::move(ctx)} { //       from. so_subscribe(from).event([](mhood_t<std::string> cmd) { //   ,    , //      B. std::cout << "(C) Message: " << *cmd << std::endl; }); } }; int main() { so_5::launch([](so_5::environment_t & env) { //      ,   //  mbox  . env.introduce_coop([&](so_5::coop_t & coop) { //  mbox,     //  A, B, C. const auto mbox = env.create_mbox(); //   ,    //  mbox  . coop.make_agent<A>(mbox); coop.make_agent<B>(mbox); coop.make_agent<C>(mbox); }); }); return 0; } 

mboxはどのように機能しますか?


異なるmbox-sの動作は異なります:)したがって、最も広く使用されているmbox-sのタイプがどのように機能するかを知るには、最初にmbox-sの一般的な内容について話す必要があります。

mboxとは何ですか?


マルチプロデューサー/マルチコンシューマー


歴史的に、これはSObjectizer-5に登場した最初のタイプのmboxです。 誰でもそのようなmboxにメッセージを送信できます。 誰でもこのmboxからのメッセージを購読できます。

マルチプロデューサー/シングルコンシューマー


1:1の対話では、MPSC-mboxを使用できます。MPSC-mboxでは誰でもメッセージを送信できますが、MPSC-mboxを所有する1人のエージェントはMPSC-mboxからのメッセージをサブスクライブできます。

MPSC mboxは、SObjectizer-5のアクティブな使用の開始後しばらくしてSObjectizer-5に登場しました。 メッセージが特定の1つのエージェントに宛てられている場合、MPMC-mboxの使用は効果的でないことが経験により示されています。 そのため、MPSC-mboxは、相互作用の組織に対する根本的に異なるアプローチではなく、コードを最適化するための方法であると言えます。 さらに、ユーザーはMPSC-mbox-sを作成できません。 各エージェントのMPSC-mboxは、SObjectizerによって自動的に作成されます。

so_5_extraライブラリからの追加のmbox


追加のso_5_extraライブラリがSObjectizerを介して構築されました。これには、SObjectizerをカーネルに追加するのは不合理と思われるコンポーネントが含まれています。 いくつかの追加タイプのmbox-sが含まれています。 例:


mboxを使用する別の興味深い例は、so_5_extra のshutdownerコンポーネントです。 このコンポーネントで mboxを使用して、大規模なSObjectizerアプリケーションを正しくシャットダウンできるタイミングを決定します。

ただし、この記事ではso_5_extraのmbox-sを詳細に検討しません。

Multi-Producer / Single-Consumer mboxはどのように機能しますか?


そのため、mbox-sは異なるため、動作が異なります。 そして、最も単純なMPSC-mboxで作業の詳細を検討し始めます。

message_limits (これはエージェントを過負荷から保護するメカニズムです)やmsg_tracing (これはメッセージが受信者に配信される方法を調べる方法です)などの特定のことを考慮しない場合、MPSC-mboxは単純な「半導体」のように機能します。送信されたメッセージを受信者エージェントに送信して、受信者がメッセージをキューに入れてメッセージ処理を待機するようにします。

まあつまり ここではすべてが非常に愚かです。送信者からメッセージを受け取り、受信者にそれを渡しました。 これ以上。

Multi-Producer / Multi-Consumer mboxはどのように機能しますか?


しかし、MPMC-mboxでは、状況はもう少し複雑です(ここでも、message_limitsやmsg_tracingなどを考慮していません)。 多くのメッセージ受信者が存在する可能性があるため、MPMC-mboxは、サブスクライバーとの関連付けコンテナーを保存します。 このコンテナ内のキーはメッセージタイプの識別子であり、要素はこのタイプのメッセージのサブスクライバの実際のリストです。

誰かがタイプMのメッセージを送信すると、MPMC-mboxはそのMのメッセージのサブスクライバーのリストを連想コンテナーで探します。そのようなリストがある場合、MPMC-mboxはこのリストを調べて、各サブスクライバーにメッセージを送信しようとします。

具体的には「与えようとする」と言われました。 delivery_filters (つまり、メッセージの内容に応じて、サブスクライバーへのメッセージ配信を有効または無効にするフィルター)などがまだあります。 メッセージがサブスクライブするエージェントに配信される前に、MPMC-mboxは、サブスクライバーにdelivery_filterがあるかどうかを確認します。 存在する場合、メッセージは最初にフィルターに送信されます。 また、フィルターがエージェントへのメッセージの配信を許可する場合にのみ、このメッセージはエージェントに送信されます。

一般に、MPMC-mboxは特定の種類のメッセージのサブスクライバーのリストを調べ、メッセージの特定のインスタンスへの配信がサブスクライバーに許可されている場合、メッセージはサブスクライバーエージェントに送信され、サブスクライバーはメッセージの処理を順番に待機します。

通常のMPMCとMPSC mboxの共通点は何ですか?


通常のMPMCおよびMPSC mboxには、それらを結合する1つの重要な機能があります。mboxには、mboxに送信されるメッセージの独自のリポジトリがありません。 すなわち 少なくとも通常のmboxは、メッセージを保存しません。 一般的に。

したがって、「 mboxがオーバーフローするまで何件のメッセージを保存でき、オーバーフローするとどうなりますか? 」または「 メッセージMが送信された後にメッセージMをサブスクライブすると、エージェントBはメッセージMを受信しますか? 」 MPMCとMPSC mboxは意味がありません。 これらのmboxについては、愚かなことにメッセージを自身の内部に保存しません。メッセージは、メッセージに興味があるエージェントにすぐに転送されます。 または、このタイプのメッセージの受信者が現在いない場合、メッセージを無視します。

また、他のタイプのmboxの場合、送信されたメッセージの内部ストレージは標準ではなくルールの例外です。 実際、mboxの操作はプッシュ原則に基づいています。送信されたメッセージはmboxに「詰め込まれ」ます。 そして、これはおそらく、mboxが誰かにメッセージを配信する唯一の機会です。 誰もmboxに新しいものが現れたかどうかを確認するためにmboxを定期的にプルすることはありません。 すなわち 誰も、まったく、誰も、プルモードのmboxでは動作しません。

結論として、一般的に、mbox-sはメッセージを内部に保存しません。

世界の状況を複雑にしている:エージェントには独自のメッセージキューがない


SObjectizerに直面した開発者は、mboxがメッセージを保存しないことをすぐに認識し始めるため、mboxの容量について質問することは意味がありません。 しかし、メッセージはmboxには保存されず、エージェントと共に保存されるため、エージェントのメッセージキューの容量に関する質問が始まります...

そして、ここで新規参入者はもう一つの啓示を受け、おそらく失望するでしょう:SObjectizerでは、エージェントは一般的な場合、独自のメッセージキューを持っていません。

行くぞ すべてではありません:)

実際、SObjectizerのエージェントのメッセージキューはディスパッチャによって制御されます。 エージェントがメッセージを処理する作業コンテキストをエージェントに提供するのはディスパッチャです。 そして、結果として、処理を待機しているメッセージのストレージを編成するのはディスパッチャです。

たとえば、one_thread(最も一般的に使用されるものの1つ)のようなディスパッチャがあります。 このディスパッチャに関連付けられているすべてのエージェントは、単一の共通の作業スレッドで動作します。 そして、すべてのエージェントのすべてのメッセージは、1つの共通メッセージキューに保存されます。 作業スレッドはこのキューから次のメッセージを取得し、受信エージェントに渡し、次のメッセージを取得します。

active_groupタイプのディスパッチャは、エージェントのグループを1つの共通の作業スレッドに関連付けることができる同様の方法で動作します。 そして、この作業スレッド上のすべてのエージェントは、共通のメッセージキューを使用します。

厄介な状況は、thread_poolおよびadv_thread_poolディスパッチャーの場合です。 そこで、エージェントのFIFOキューのパラメーターを設定できます。 それらの1つは、エージェントが使用するキューです。 エージェントに独自のキューを持たせることができます。このキューには、この特定のエージェントに宛てられたメッセージのみが存在します。 また、同じ協力関係にあるエージェントが共通のメッセージキューを共有するようにできます。

エージェントの優先順位をサポートするディスパッチャでさらに楽しくなります。 たとえば、ディスパッチャprio_one_thread :: strictly_ordered。 そこでは、同じ優先度を持つすべてのエージェントに共通のメッセージキューが1つあります。 ただし、優先順位が異なるエージェントの場合、メッセージキューは異なります。

要するに、最終行:一般的に、mboxはエージェントにメッセージを送信し、エージェントは既に適切なキューにメッセージを保存しているディスパッチャにメッセージを送信します。 したがって、ここでも、一般的な場合、mbox-sもエージェントにもメッセージ用のストレージがありません。

また、独自のmboxを作成するのはどれくらい難しいですか?


message_limits、delivery_filters、msg_tracingおよびその他のニュアンスをサポートして「すべてのルールで」それを行う場合、それは非常に困難です。 たとえば、 so_5_extraのretain_msg mboxの実装の内臓を見て、これがどれだけ怖いのかを見ることができます:)

ただし、特定のタスクのために独自のmboxが実行された場合、すべてがそれほどひどいものになることはありません。 MPSC-mboxを小さな例として作成してみましょう。これにより、エージェントが頻繁にメッセージを受信するのを防ぐことができます。 たとえば、M1メッセージの後に250ミリ秒未満でM2メッセージが到着した場合、そのメッセージは破棄されます。 M1が250ミリ秒以上経過した後、M2が受信者に配信されます。

必要な説明


それでは、コード名anti-jitter-mboxで独自のmboxを作成してみましょう。 これはMPSC-mboxになり、特定のエージェントに関連付ける必要があります。

私たちの生活を簡素化するために、MPSC-mboxの独自の完全な実装は作成しません。 代わりに、各エージェント用の既製のMPSC-mboxを使用します。 anti-jitter-mboxがMPSC-mboxコンストラクターに属するエージェントを、anti-jitter-mboxのコンストラクターに渡すだけです。

独自のクラスanti_jitter_mboxを定義する必要があります。これは特別なクラスso_5 :: absctract_message_mbox_tの子孫でなければなりません。 このクラスでは、absctract_message_mbox_tにある純粋な仮想メソッドをオーバーライドする必要があります。 SObjectizerバージョン5.5。*では、次のメソッドがあります。

id() 。 一意のmbox IDを返す必要があります。 メッセージ配信は、実際にはコンストラクタで渡されるMPSC-mboxエージェントによって実行されるため、この特定のMPSC-mboxのIDを返します。 すなわち ここでは、単に現在のMPSC-mboxに作業を委任します。

subscribe_event_handler() 。 このメソッドは、エージェントがタイプTのメッセージをサブスクライブするときに呼び出されます。このメソッドにタイプTを登録します。あるタイプMのメッセージがmboxに到着したときに、エージェントがサブスクライブしているかどうかを確認できるようにします。 署名されている場合、メッセージの配信を試みることができます(それに応じて、最後の配信の時刻を修正する必要があります)。 署名されていない場合、メッセージは無視する必要があります。

unsubscribe_event_handlers() 。 subscribe_event_handler()とは対照的に、このメソッドは、エージェントがタイプTのメッセージをサブスクライブ解除するときに呼び出されます。このメソッドでタイプTの登録をキャンセルします。

query_name() 。 このメソッドは、mboxの文字列名を返す必要があります。 このメソッドは、デバッグと診断の目的に役立ちます。 たとえば、SObjectizerは、発生したエラーに関するメッセージを生成するときにこのメソッドをプルできます。

タイプ() 。 このメソッドは、mboxタイプを返します。mboxがマルチプロデューサー/マルチコンシューマーか、マルチプロデューサー/シングルコンシューマーかを返します。 このメソッドは、特定のアクションを実行できるかどうかを確認するためにSObjectizerによって呼び出されます。 たとえば、 変更可能なメッセージはMPSC mboxにのみ送信できます

do_deliver_message() 。 このメソッドは、受信エージェントにメッセージを送信します。 このメソッドでは、送信されたメッセージのタイプが登録されているかどうかを確認する必要があります。 そうでない場合、メッセージは無視されます。 登録されていて、最後にメッセージが配信されてから十分な時間が経過した場合、メッセージを受信者に配信する必要があります(この場合、配信時間を修正します)。 配信自体は、実際のMPSC-mboxエージェントに委任されます。

do_deliver_service_request() 。 このメソッドはdo_deliver_message()に似ていますが、エージェントAがエージェントBに同期リクエストを行ったときに呼び出されます(つまり、send_messageの代わりにrequest_futureまたはrequest_valueが使用されます)。 簡単にするため、anti-jitter-mboxの同期リクエスト機能はサポートしません。

set_delivery_filter()およびdrop_delivery_filter() これらのメソッドは、メッセージ配信フィルターを設定および削除するために使用されます。 配信フィルターはMPSC-mbox向けではないため、この例ではこの機能をサポートしません。

いくつかのabstract_message_mbox_tメソッドの一貫性の説明


この例の実装では、do_deliver_message()およびdo_deliver_service_request()メソッドが定数として宣言されていることがわかります。 しかし、なぜなら do_deliver_message()では、anti-jitter-mboxの内部状態を変更する必要があります。次に、mboxクラスの説明でこの同じ状態を可変としてマークする必要があります。

これは、abstract_message_mbox_tクラスのインターフェースの形成における古代のアーキテクチャの誤算の結果です。 このクラスが何年も前に形成されたとき、誰かが独自のタイプのmboxを作成する必要があるとは考えていませんでした。

1年半前に必要であるだけでなく、時には非常に必要であることが判明したとき、SObjectizer-5.5ブランチ内の互換性を破るか、すべてをそのままにして、将来のメジャーリリースでabstract_message_mbox_tインターフェースを変更するという選択に直面しました(SObjectizer-5.6など)。 同じブランチ内のリリース間の互換性を維持することについては非常に重要なので、SObjectizer-5.5ではすべてをそのままにすることにしました。 そのため、独自のmboxを実装する場合、いくつかのabstract_message_mbox_tメソッドの不変性を考慮し、mutableキーワードを使用する必要があります。

独自のanti-jitter-mboxの実装


さて、私たちは自分のmboxがどのようになるかをすでに見ることができます。

mboxが処理するデータから始めましょう:

 using namespace std::chrono; using clock_type = steady_clock; class anti_jitter_mbox : public so_5::abstract_message_box_t { //        ,   //           // . struct data { //      . struct item { //      .  0 , //          . std::size_t subscribers_{0}; //    . //   ,       . std::optional<clock_type::time_point> last_received_{}; }; //   ,     //   . using message_table = std::map<std::type_index, item>; //   mutex    mbox-   // . std::mutex lock_; //  ,    . message_table messages_; }; //  mbox-. //  mbox,       . const so_5::mbox_t mbox_; //     "" . const clock_type::duration timeout_; //   mbox-.  SObjectizer-5.5    //  mutable, ..         // const-. mutable data data_; 

受信者エージェントへのメッセージ配信が行われる実際のmbox、「不要な」メッセージを遮断するための時間しきい値、および実際にメッセージのタイプと最後に受信された時間に関する情報が必要です。 さらに、mboxメソッドは異なる作業スレッドで呼び出すことができ、mboxにスレッドセーフを提供する必要があるため、ミューテックスが必要です。

ところで、ほとんどのメソッドでスレッドセーフであるという理由だけで、mboxの内部ミューテックスをキャプチャする必要があります。 私たちの生活を簡素化するために、補助テンプレートメソッドを作成します。このメソッドは、ミューテックスをキャプチャし、キャプチャしたミューテックスの下で必要なアクションを実行します。

  template<typename Lambda> decltype(auto) lock_and_perform(Lambda l) const noexcept { std::lock_guard<std::mutex> lock{data_.lock_}; return l(); } 

原則として、その存在は必要ありません。 しかし、次の理由でそれを使用することにしました。実装を簡単にするために、例外の安全性などを気にしません。 一部のアクション中に例外が発生した場合、アプリケーション全体を中断するだけです。 lock_and_performはnoexceptとしてマークされ、この動作を提供します-ラムダが例外をスローすると、C ++ランタイム自体がstd :: terminateを呼び出します。

さて、mbox自体の実際の実装を見ることができます:

 public: // .    MPSC-mbox,  //      . anti_jitter_mbox( so_5::mbox_t actual_mbox, clock_type::duration timeout) : mbox_{std::move(actual_mbox)} , timeout_{timeout} {} //  ID mbox-.     ID  mbox-. so_5::mbox_id_t id() const override { return mbox_->id(); } //    . void subscribe_event_handler( const std::type_index & msg_type, const so_5::message_limit::control_block_t * limit, so_5::agent_t * subscriber ) override { lock_and_perform([&]{ //      .    //   ,     . auto & msg_data = data_.messages_[msg_type]; msg_data.subscribers_ += 1; //     mbox-. mbox_->subscribe_event_handler(msg_type, limit, subscriber); }); } //   . void unsubscribe_event_handlers( const std::type_index & msg_type, so_5::agent_t * subscriber ) override { lock_and_perform([&]{ //      . //    ,     . auto it = data_.messages_.find(msg_type); if(it != data_.messages_.end()) { auto & msg_data = it->second; --msg_data.subscribers_; if(!msg_data.subscribers_) //    ,    //       . data_.messages_.erase(it); //  mbox      . mbox_->unsubscribe_event_handlers(msg_type, subscriber); } }); } //   mbox-. std::string query_name() const override { return "<mbox:type=anti-jitter-mpsc:id=" + std::to_string(id()) + ">"; } //   mbox-.  ,    . so_5::mbox_type_t type() const override { return mbox_->type(); } //     . void do_deliver_message( const std::type_index & msg_type, const so_5::message_ref_t & message, unsigned int overlimit_reaction_deep ) const override { lock_and_perform([&]{ //       . //    ,      //    . auto it = data_.messages_.find(msg_type); if(it != data_.messages_.end()) { auto & msg_data = it->second; const auto now = clock_type::now(); // ,    . //      (..  last_received_ //  ),   . bool should_be_delivered = true; if(msg_data.last_received_) { should_be_delivered = (now - *(msg_data.last_received_)) >= timeout_; } //  - ,     mbox  //        // . if(should_be_delivered) { msg_data.last_received_ = now; mbox_->do_deliver_message(msg_type, message, overlimit_reaction_deep); } } }); } //    . void do_deliver_service_request( const std::type_index & /*msg_type*/, const so_5::message_ref_t & /*message*/, unsigned int /*overlimit_reaction_deep*/ ) const override { //  ,    so_5::exception_t   //  ,     SObjectizer-. SO_5_THROW_EXCEPTION(so_5::rc_not_implemented, "anti-jitter-mbox doesn't support service requests"); } //    MPSC-mbox-  .   //   . void set_delivery_filter( const std::type_index & /*msg_type*/, const so_5::delivery_filter_t & /*filter*/, so_5::agent_t & /*subscriber*/ ) override { SO_5_THROW_EXCEPTION(so_5::rc_not_implemented, "anti-jitter-mbox doesn't support delivery filters"); } void drop_delivery_filter( const std::type_index & /*msg_type*/, so_5::agent_t & /*subscriber*/ ) noexcept override { SO_5_THROW_EXCEPTION(so_5::rc_not_implemented, "anti-jitter-mbox doesn't support delivery filters"); } }; 

さて、mboxの動作をテストするには、互いに類似した2つのテストエージェントを作成する必要があります。最初のエージェントのみが、通常のMPSC-mboxからメッセージを受信する必要があります。

 class ordinary_subscriber final : public so_5::agent_t { const std::string name_; public: ordinary_subscriber(context_t ctx, //  ,    . std::string name) : so_5::agent_t{std::move(ctx)} , name_{std::move(name)} { so_subscribe_self().event([&](mhood_t<std::string> cmd) { std::cout << name_ << ": signal received -> " << *cmd << std::endl; }); } // Mbox,      . auto target_mbox() const { return so_direct_mbox(); } }; 

そして、この目的のための2番目のエージェントはanti-jitter-mboxを使用します。

 class anti_jitter_subscriber final : public so_5::agent_t { const std::string name_; const so_5::mbox_t anti_jitter_mbox_; public: anti_jitter_subscriber(context_t ctx, //  ,    . std::string name, //  ,     //  "" . clock_type::duration jitter_threshold) : so_5::agent_t{std::move(ctx)} , name_{std::move(name)} , anti_jitter_mbox_{ new anti_jitter_mbox{so_direct_mbox(), jitter_threshold}} { //     mbox. so_subscribe(anti_jitter_mbox_).event([&](mhood_t<std::string> cmd) { std::cout << name_ << ": signal received -> " << *cmd << std::endl; }); } // Mbox,      . auto target_mbox() const { return anti_jitter_mbox_; } }; 

さて、これがテスト実行のためにすべて起動される方法です:

 //       . void generate_msg_sequence( so_5::environment_t & env, const so_5::mbox_t & ordinary_mbox, const so_5::mbox_t & anti_jitter_mbox) { std::vector<milliseconds> delays{ 125ms, 250ms, 400ms, 500ms, 700ms, 750ms, 800ms }; for(const auto d : delays) { const std::string msg = std::to_string(d.count()) + "ms"; so_5::send_delayed<std::string>(env, ordinary_mbox, d, msg); so_5::send_delayed<std::string>(env, anti_jitter_mbox, d, msg); } } int main() { //  SObjectizer    . so_5::launch([](so_5::environment_t & env) { //    mbox-.      //     . so_5::mbox_t ordinary, anti_jitter; //    ,      //  mbox. env.introduce_coop([&](so_5::coop_t & coop) { ordinary = coop.make_agent<ordinary_subscriber>( "ordinary-mbox")->target_mbox(); anti_jitter = coop.make_agent<anti_jitter_subscriber>( "anti-jitter-mbox", 250ms)->target_mbox(); }); //     . generate_msg_sequence(env, ordinary, anti_jitter); //       . std::this_thread::sleep_for(1250ms); //    . env.stop(); }); return 0; } 

サンプルを実行した結果、anti-jitter-mboxを使用するエージェントは、通常のmboxを使用するエージェントよりも少ないメッセージを処理することがわかります。

 通常のmbox:受信信号-> 125ms
anti-jitter-mbox:受信信号-> 125ms
通常のmbox:受信した信号-> 250ms
通常のmbox:受信した信号-> 400ms
anti-jitter-mbox:受信信号-> 400ms
通常のmbox:信号を受信-> 500ms
通常のmbox:受信信号-> 700ms
anti-jitter-mbox:受信信号-> 700ms
通常のmbox:信号受信-> 750ms
通常のmbox:受信した信号-> 800ms 

例付きのリポジトリ


この記事の例の完全なソースコードは、このリポジトリにあります

エピローグ


記事の終わりに、最後まで読み通す忍耐を持った読者に感謝したいと思います。それはあなたがそれをやった、ありがとう、簡単ではありませんでした:)

私たちは、ほんの少しのニュースを共有したい:SObjectizerとso_5_extraの更新しますバージョン5.5.20までのSObjectizer、バージョン1.0.3までのso_5_extra。SObjectizerは、vcpkg依存関係管理システムからも利用できますしたがって、vcpkg install sobjectizerを使用してSObjectizerをインストールできます。

また、来年、SObjectizerの次のメジャーバージョンである、バージョン5.6で作業を開始する予定です。バージョン5.6では、互換性のためにプルする古いロードを削除し、場合によってはバージョン5.5との互換性を破壊します。SObjectizer-5.6のいくつかの予備的な考慮事項をここに示しますSObjectizerの将来のバージョンで何を見たいか、SObjectizerの開発のどの方向があなたに興味があるかについて、SObjectizerに興味がある人の意見を聞くことは素晴らしいことです。

誰かがSObjectizerの動作の詳細を知りたい場合は、コメントで質問してください。答えようとします。また、答えに長いテキストが必要な場合は、別の記事が見つかるかもしれません。

Source: https://habr.com/ru/post/J344580/


All Articles