C ++でのマルチスレッドオブザーバー(練習)

このパターンの主題には多くのバリエーションがありますが、ほとんどの例はマルチスレッドアプリケーションには適していません。
この記事では、マルチスレッドアプリケーションでパターンを適用した経験を共有し、遭遇した主な問題について説明します。
この記事の目的は、マルチスレッドアプリケーションの作成時に発生する可能性のある問題に開発者の注意を引くことです。 マルチスレッドアプリケーションのコンポーネント間の通信の実装における落とし穴を特定します。
既製のソリューションが必要な場合は、 2009年5月からboostに含まれているSignals2ライブラリに注意してください。
既製で使用できるソリューションを提供しようとはしていません。 それにもかかわらず、資料を読んだ後は、何らかの理由で使用できないまたは望ましくないプロジェクト(ドライバー、低レベルアプリケーションなど)でサードパーティのライブラリを使用せずに実行できます。

サブジェクトエリア


俳優

NotificationSender-メッセージを送信するオブジェクト。
通常、これは状態の変化を通知するワークフローであり、ユーザーインターフェイスに表示する必要があります。
NotificationListener-通知処理を実装するオブジェクト。
通常、これは、バックグラウンドタスクに関連付けられたユーザーインターフェイスの一部の表示を制御するオブジェクトです。
このようなオブジェクトは多数存在する可能性がありますが、動的に接続/切断できます(たとえば、タスクの詳細が表示されるダイアログボックスを開く)
NotificationDispatcher-サブスクライバーとメーリングリストを管理するオブジェクト。

オブジェクト間の相互作用

すべてのサブスクライバーへのメッセージの配布。
サブスクリプションをサブスクライブ/終了するプロセス。
オブジェクトの寿命。
この記事では、同期メッセージング方式について説明します。 つまり、SendMessage関数の呼び出しは同期的に行われ、このメソッドを呼び出すスレッドは、すべてのサブスクライバーがメッセージ処理を完了するまで待機します。 場合によっては、このアプローチは非同期メール送信よりも便利ですが、同時に購読解除には困難が伴います。

最も単純なシングルスレッド実装


typedef unsigned __int64 SubscriberId; class CSubscriber { public: virtual ~CSubscriber(){} virtual void MessageHandler(void* pContext) = 0; SubscriberId GetSubscriberId() {return (SubscriberId)this;} }; class CDispatcher { private: typedef std::vector<CSubscriber*> CSubscriberList; public: SubscriberId Subscribe(CSubscriber* pNewSubscriber) { for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId()) { return 0; } } m_SubscriberList.push_back(pNewSubscriber); return pNewSubscriber->GetSubscriberId(); } bool Unsubscribe(SubscriberId id) { for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == id) { m_SubscriberList.erase(m_SubscriberList.begin() + i); return true; } } return false; } void SendMessage(void* pContext) { for(size_t i = 0; i < m_SubscriberList.size(); ++i) { m_SubscriberList[i]->MessageHandler(pContext); } } private: CSubscriberList m_SubscriberList; }; 

ここで、サブスクライバの一意の識別子はサブスクライバオブジェクトのアドレスです。GetSubscriberId関数は、型変換に関係なく、常に1つのサブスクライバオブジェクトに対して同じ値を返します。

使用例

 class CListener: public CSubscriber { virtual void MessageHandler(void* pContext) { wprintf(L"%d\n", *((int*)pContext)); } }; int _tmain(int argc, _TCHAR* argv[]) { CDispatcher Dispatcher; CListener Listener1; CListener Listener2; Dispatcher.Subscribe(&Listener1); Dispatcher.Subscribe(&Listener2); for(int i = 0; i < 5; ++i) { Dispatcher.SendMessage(&i); } Dispatcher.Unsubscribe(Listener2.GetSubscriberId()); Dispatcher.Unsubscribe(Listener1.GetSubscriberId()); return 0; } 

メッセージハンドラー内のサブスクライバーの無効化


この例では、マルチスレッドに関連しない問題があります。 この問題は、MessageHandlerハンドラー内でサブスクライブを解除しようとすると現れます。 この問題は、MessageHandlerを呼び出す前にサブスクライバーのリストをコピーすることで解決されます。

マルチスレッド環境への移行


1つのスレッドで、そのようなコードは非常に安定して動作します。
複数のスレッドが動作するときに何が起こるか見てみましょう。
 CDispatcher g_Dispatcher; DWORD WINAPI WorkingThread(PVOID pParam) { for(int i = 0;;++i) { g_Dispatcher.SendMessage(&i); } }; int _tmain(int argc, _TCHAR* argv[]) { ::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL); CListener Listener1; CListener Listener2; for(;;) { g_Dispatcher.Subscribe(&Listener1); g_Dispatcher.Subscribe(&Listener2); g_Dispatcher.Unsubscribe(Listener1.GetSubscriberId()); g_Dispatcher.Unsubscribe(Listener2.GetSubscriberId()); } return 0; } 

遅かれ早かれ、クラッシュが発生します。
問題は、サブスクライバーの追加/削除と通知の送信です(この例では、CDispatcher :: m_SubscriberListへのマルチスレッドアクセス)。
ここでは、サブスクライバーのリストへのアクセスを同期する必要があります。

サブスクライバーリストアクセス同期


 class CDispatcher { private: typedef std::vector<CSubscriber*> CSubscriberList; public: SubscriberId Subscribe(CSubscriber* pNewSubscriber) { CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId()) { return 0; } } m_SubscriberList.push_back(pNewSubscriber); return pNewSubscriber->GetSubscriberId(); } bool Unsubscribe(SubscriberId id) { CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == id) { m_SubscriberList.erase(m_SubscriberList.begin() + i); return true; } } return false; } void SendMessage(void* pContext) { CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { m_SubscriberList[i]->MessageHandler(pContext); } } private: CSubscriberList m_SubscriberList; CLock m_Lock; }; 

アクセス同期は、同期オブジェクト(クリティカルセクションまたはミューテックス)を使用して実装されました。
移植性を高めるために、また起きていることの本質から気を散らさないために、EnterCriticalSectionなどのプラットフォーム依存の関数への直接呼び出しから抽象化します。 これを行うには、CLockクラスを使用します。
C ++例外に対する耐性を得るには、RAIIテクノロジ、つまりCScopeLockerクラスを使用すると便利です。CScopeLockerクラスは、コンストラクターで同期オブジェクトをキャプチャし、デストラクタで解放します。
そのような実装では、プログラムは落ちませんが、別の不快な状況が待っています。

デッドロックとの戦い


バックグラウンドタスクを実行する特定のスレッドがあり、このタスクの進行状況が表示されるウィンドウがあるとします。
通常、スレッドはウィンドウクラスに通知を送信し、ウィンドウクラスはウィンドウメッセージプロシージャのコンテキストで何らかのアクションを開始するSendMessageシステム関数を呼び出します。
SendMessageシステム関数はブロックされており、ウィンドウスレッドにメッセージを送信し、処理を待機します。
リスナーオブジェクトの接続/切断がウィンドウプロシージャのコンテキスト(ウィンドウスレッド内)でも発生する場合、スレッドの相互ブロック(いわゆるデッドロック)が可能です。
このようなデッドロックは、ごくまれにしか再現できません(Subscribe / Unsubscribeを呼び出し、同時に別のスレッドでMessageHandlerを呼び出すとき)
次のコードは、SendMessageシステム関数のブロッキング呼び出しをエミュレートします。

 CDispatcher g_Dispatcher; CLock g_Lock; class CListener: public CSubscriber { virtual void MessageHandler(void* pContext) { //   SendMessage g_Lock.Lock(); wprintf(L"%d\n", *((int*)pContext)); g_Lock.Unlock(); } }; DWORD WINAPI WorkingThread(PVOID pParam) { for(int i = 0;;++i) { g_Dispatcher.SendMessage(&i); } }; int _tmain(int argc, _TCHAR* argv[]) { ::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL); CListener Listener1; CListener Listener2; for(;;) { //    (  ) g_Lock.Lock(); g_Dispatcher.Subscribe(&Listener1); g_Dispatcher.Subscribe(&Listener2); g_Lock.Unlock(); Sleep(0); g_Lock.Lock(); g_Dispatcher.Unsubscribe(Listener1.GetSubscriberId()); g_Dispatcher.Unsubscribe(Listener2.GetSubscriberId()); g_Lock.Unlock(); } return 0; } 

問題は、メインスレッドがg_Lockグローバル同期オブジェクトをキャプチャし(ウィンドウプロシージャと同様に、ウィンドウストリームのコンテキストで実行する)、Subscribe / Unsubscribeメソッドを呼び出すことです。内部で2番目のCDispatcher :: m_Lock同期オブジェクトをキャプチャしようとします。
この時点で、ワーカースレッドは通知を送信して、CDispatcher :: SendMessage関数でCDispatcher :: m_Lockをキャプチャし、g_Lockグローバル同期オブジェクトをキャプチャしようとします(ウィンドウと同様に、SendMessageシステム関数を呼び出します)。

ウィンドウフローA-> B
ワークフローB-> A

これは、古典的なデッドロックと呼ばれます。
問題は、関数CDispatcher :: SendMessage()にあります。
ここでは、ルールを順守する必要があります-同期オブジェクトのキャプチャ中にコールバック関数を呼び出すことはできません。
そのため、通知を送信するときにロックを解除します。

 void SendMessage(void* pContext) { CSubscriberList SubscriberList; { CScopeLocker ScopeLocker(m_Lock); SubscriberList = m_SubscriberList; } for(size_t i = 0; i < SubscriberList.size(); ++i) { SubscriberList[i]->MessageHandler(pContext); } } 

加入者の寿命管理


デッドロックを削除すると、別の問題が発生しました-サブスクライブするオブジェクトの寿命です。
Unsubscribeを呼び出した後にMessageHandlerメソッドが呼び出されないという保証はなくなりました。そのため、Unsubscribeを呼び出した直後にサブスクライブオブジェクトを削除することはできません。
この場合、最も簡単な方法は、リンクカウンターを使用してサブスクライブするオブジェクトの有効期間を制御することです。
これを行うには、COMテクノロジを使用できます-ISubknownからCSubscriberインターフェイスを継承し、サブスクライブオブジェクトのリストにATL CComPtrを使用します。つまり、std :: vector <CSubscriber *>をstd :: vector <CComPtr>に置き換えます。
しかし、そのような実装には、それぞれがAddRef / Releaseおよび不要なQueryInterfaceメソッドを実装する必要があるため、サブスクライバクラスの実装に追加のコストがかかりますが、プロジェクトがCOMを積極的に使用している場合、このアプローチには利点があります。
スマートポインターは、リンクカウンターを使用してサブスクライブするオブジェクトの有効期間を制御するのに適しています。

マルチスレッド環境のシンプルな実装


 typedef unsigned __int64 SubscriberId; class CSubscriber { public: virtual ~CSubscriber(){} virtual void MessageHandler(void* pContext) = 0; SubscriberId GetSubscriberId() {return (SubscriberId)this;} }; typedef boost::shared_ptr<CSubscriber> CSubscriberPtr; class CDispatcher { private: typedef std::vector<CSubscriberPtr> CSubscriberList; public: SubscriberId Subscribe(CSubscriberPtr pNewSubscriber) { CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId()) { return 0; } } m_SubscriberList.push_back(pNewSubscriber); return pNewSubscriber->GetSubscriberId(); } bool Unsubscribe(SubscriberId id) { CSubscriberPtr toRelease; CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == id) { toRelease = m_SubscriberList[i]; m_SubscriberList.erase(m_SubscriberList.begin() + i); return true; } } return false; } void SendMessage(void* pContext) { CSubscriberList SubscriberList; { CScopeLocker ScopeLocker(m_Lock); SubscriberList = m_SubscriberList; } for(size_t i = 0; i < SubscriberList.size(); ++i) { SubscriberList[i]->MessageHandler(pContext); } } private: CSubscriberList m_SubscriberList; CLock m_Lock; }; 

この実装では、CSubscriber *の「ベア」ポインターを参照カウンター付きの「スマート」ポインターに置き換えました。これは、ブーストライブラリにあることが判明しました。
また、Unlockを呼び出した後にサブスクライバーオブジェクトのデストラクターを呼び出すために、ToRelease変数をUnsubscribe関数に追加しました(同期オブジェクトをキャプチャしている間、サブスクライバーオブジェクトのデストラクターを含むコールバック関数を呼び出すことはできません)。
SendMessage関数がスマートポインターのリストをコピーすることは注目に値します(コピー後、すべてのポインターは参照カウントを増やし、関数を終了すると減少し、サブスクライブオブジェクトの有効期間を制御します)

テスト中


 CDispatcher g_Dispatcher; CLock g_Lock; class CListener: public CSubscriber { virtual void MessageHandler(void* pContext) { //   SendMessage g_Lock.Lock(); wprintf(L"%d\n", *((int*)pContext)); g_Lock.Unlock(); } }; DWORD WINAPI WorkingThread(PVOID pParam) { for(int i = 0;;++i) { g_Dispatcher.SendMessage(&i); } }; int _tmain(int argc, _TCHAR* argv[]) { ::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL); for(;;) { boost::shared_ptr<CListener> pListener1(new CListener); boost::shared_ptr<CListener> pListener2(new CListener); //    (  ) g_Lock.Lock(); g_Dispatcher.Subscribe(pListener1); g_Dispatcher.Subscribe(pListener2); g_Lock.Unlock(); Sleep(0); g_Lock.Lock(); g_Dispatcher.Unsubscribe(pListener1->GetSubscriberId()); g_Dispatcher.Unsubscribe(pListener2->GetSubscriberId()); g_Lock.Unlock(); } return 0; } 

マルチスレッド環境向けに最適化された実装


原則として、SendMessage関数は、Subscribe / Unsubscribeよりもはるかに頻繁に呼び出されます。 多数のサブスクライバーの場合、ボトルネックはSendMessage内のサブスクライバーのリストをコピーすることです。
購読者のリストのコピーは、購読/購読解除機能に転送できます。 これは、ロックフリーアルゴリズムの手法に似ています。
CDispatcherオブジェクトは、サブスクライバーのリストを直接保存するのではなく、スマートポインターを使用して保存します。 SendMessage関数内で、サブスクライバーの現在のリストへのポインターを取得して操作します。 Subscribe / Unsubscribe関数では、毎回新しいサブスクライバーのリストを作成し、CDispatcherオブジェクト内のポインターを新しいサブスクライバーのリストにリダイレクトします。 したがって、CDispatcherオブジェクト内のサブスクライバーのリストへのポインターは既にサブスクライバーの新しいリストを指しますが、SendMessage関数は引き続き古いリストで機能します。 サブスクライバーの古いリストを変更するユーザーはいないため、すべてがマルチスレッド環境で安定して動作します。
原則として、Subscribe / Unsubscribeの機能をわずかに変更し、完全にロックフリーのアルゴリズムを実装できますが、これは別のトピックです。
購読解除Medotは非同期であり、完了後にメール送信の完全な停止を保証するものではありません。半分の解決策です。購読者はUnsubscribeHandler関数を使用して購読解除に関する通知を受け取ります。 この動作を実装するために、デストラクタでUnsubscribeHandler関数を呼び出す中間クラスCSubscriberItemが追加されました。
 namespace Observer { ////////////////////////// // Subscriber ////////////////////////// typedef unsigned __int64 SubscriberId; class CSubscriber { public: virtual ~CSubscriber(){} virtual void MessageHandler(void* pContext) = 0; virtual void UnsubscribeHandler() = 0; SubscriberId GetSubscriberId() {return (SubscriberId)this;} }; typedef boost::shared_ptr<CSubscriber> CSubscriberPtr; ////////////////////////////////////////////////////////////////////// // Dispatcher /////////////////////////////////// class CDispatcher { private: class CSubscriberItem { public: CSubscriberItem(CSubscriberPtr pSubscriber) :m_pSubscriber(pSubscriber) { } ~CSubscriberItem() { m_pSubscriber->UnsubscribeHandler(); }; CSubscriberPtr Subscriber()const {return m_pSubscriber;} private: CSubscriberPtr m_pSubscriber; }; typedef boost::shared_ptr<CSubscriberItem> CSubscriberItemPtr; typedef std::vector<CSubscriberItemPtr> CSubscriberList; typedef boost::shared_ptr<CSubscriberList> CSubscriberListPtr; public: CDispatcher() { } private: CDispatcher(const CDispatcher&){} CDispatcher& operator=(const CDispatcher&){return *this;} public: SubscriberId Subscribe(CSubscriberPtr pNewSubscriber) { //Declaration of the next shared pointer before ScopeLocker //prevents release of subscribers from under lock CSubscriberListPtr pNewSubscriberList(new CSubscriberList()); //Enter to locked section CScopeLocker ScopeLocker(m_Lock); if(m_pSubscriberList) { //Copy existing subscribers pNewSubscriberList->assign(m_pSubscriberList->begin(), m_pSubscriberList->end()); } for(size_t i = 0; i < pNewSubscriberList->size(); ++i) { CSubscriberItemPtr pSubscriberItem = (*pNewSubscriberList)[i]; if(pSubscriberItem->Subscriber()->GetSubscriberId() == pNewSubscriber->GetSubscriberId()) { return 0; } } //Add new subscriber to new subscriber list pNewSubscriberList->push_back(CSubscriberItemPtr(new CSubscriberItem(pNewSubscriber))); //Exchange subscriber lists m_pSubscriberList = pNewSubscriberList; return pNewSubscriber->GetSubscriberId(); } bool Unsubscribe(SubscriberId id) { //Declaration of the next shared pointers before ScopeLocker //prevents release of subscribers from under lock CSubscriberItemPtr pSubscriberItemToRelease; CSubscriberListPtr pNewSubscriberList; //Enter to locked section CScopeLocker ScopeLocker(m_Lock); if(!m_pSubscriberList) { //No subscribers return false; } pNewSubscriberList = CSubscriberListPtr(new CSubscriberList()); for(size_t i = 0; i < m_pSubscriberList->size(); ++i) { CSubscriberItemPtr pSubscriberItem = (*m_pSubscriberList)[i]; if(pSubscriberItem->Subscriber()->GetSubscriberId() == id) { pSubscriberItemToRelease = pSubscriberItem; } else { pNewSubscriberList->push_back(pSubscriberItem); } } //Exchange subscriber lists m_pSubscriberList = pNewSubscriberList; if(!pSubscriberItemToRelease.get()) { return false; } return true; } void SendMessage(void* pContext) { CSubscriberListPtr pSubscriberList; { CScopeLocker ScopeLocker(m_Lock); if(!m_pSubscriberList) { //No subscribers return; } //Get shared pointer to an existing list of subscribers pSubscriberList = m_pSubscriberList; } //pSubscriberList pointer to copy of subscribers' list for(size_t i = 0; i < pSubscriberList->size(); ++i) { (*pSubscriberList)[i]->Subscriber()->MessageHandler(pContext); } } private: CSubscriberListPtr m_pSubscriberList; CLock m_Lock; }; }; //namespace Observer 

参照資料


Boostライブラリ:: Signals2の記事
スマートポインタージェフアルジャー
リソース取得は初期化(RAII) ウィキペディア
この記事の最初のバージョンに関するコメントはこちらにあります。

Source: https://habr.com/ru/post/J108857/


All Articles