ユニバーサルロックフリーオブジェクトに関する多くの記事がありますが、いくつかの特別な場合には、それらは不必要に面倒です。 私の場合は、それだけでした。あるストリームから別のストリームへの情報の一方向の送信を整理する必要がありました。 メインスレッドはワーカーによって開始されます。その後、彼は停止を要求することしかできなくなり、管理できなくなります。 次に、ワーカースレッドは、現在の状態(実行の進行状況)をメインに通知し、実行の中間結果を送信できます。 ワーカーからメインストリームへのデータ転送のみが必要であることがわかりました。
もちろん、おそらく私は自転車を、あるいはもっと悪いことにグリッチのある自転車を発明しました。 したがって、コメントと批判は大歓迎です!
状態オブジェクト
ワークフローの状態はクラスとして表されます。 同時に、メインスレッドは常に状態オブジェクトに保存されたデータを取得する義務はありません(たとえば、メインスレッドが進行状況の中間値をスキップするかどうかは関係ありません。現時点で関連する最新のものを取得することが重要です)
ロックフリー状態転送を実装するには、その3つのインスタンス(同じクラスの異なるオブジェクト)が必要です。
var ReadItem: TLockFreeWorkState; CurrentItem: TLockFreeWorkState; WriteItem: TLockFreeWorkState;
アイデアは次のとおりです。ワークフローはWriteItemオブジェクトに自由にアクセスできます。 すべてのデータが保存されると、InterlockedExchange操作がCurrentItemのオブジェクトで実行され、その後、メインスレッドに新しい状態の準備ができたことが通知されます(この例では、通常のPostMessageが使用されました)。 通知ハンドラーのメインスレッドは、ReadItemオブジェクトを使用してCurrentItemオブジェクトのInterlockedExchange操作を実行します。その後、ReadItemからデータを自由に読み取ることができます。
このような「バブル」が判明します。ステータスデータがWriteItemに表示され、ReadItemのCurrentItemを介して「ポップアップ」されます。 ちなみに、このような構造の基本クラスの通常の名前は思いつかなかったので、単にTLockFreeWorkStateを呼び出しました(誰かがもっと良いアイデアを持っているかもしれません)。
注意点が1つあります。メインスレッドはいつでも現在の状態に適用できます。 常にInterlockedExchangeを実行する場合は、現在の状態と以前の状態を交互に返します。
クラスの通常の最新フラグは、これを防ぐのに役立ちます。 状態を書き込むとき、ワークフローは常にWriteItem.Newest:= Trueを設定し、InterlockedExchangeの後、このフラグはCurrentItemになります。 最初のメインスレッドはCurrentItem.Newestをチェックし、Trueの場合のみ、InterlockedExchangeを実行し、ReadItem.Newestは直ちにFalseにリセットします。 メインスレッドからCurrentItem.Newestを読むのは安全だと思いましたが、正しくない場合は修正してください。
これはすべて、単純化されたコードの形式になっています(明確にするために、型のゴーストは省略されています)。
type TLockFreeWorkState = class public Newest: Boolean; end; function Read(var CurrentItem, ReadItem: TLockFreeWorkState): Boolean; begin if CurrentItem.Newest then begin ReadItem := InterlockedExchangePointer(CurrentItem, ReadItem); ReadItem.Newest := False; Result := True; end else Result := False; end; procedure Write(var CurrentItem, WriteItem: TLockFreeWorkState); begin WriteItem.Newest := True; WriteItem := InterlockedExchangePointer(CurrentItem, WriteItem); end;
キューオブジェクト
いくつかの点でアプローチは似ていますが、実装のために最初に必要なオブジェクトは1つだけですが、それへの2つのリンクが必要です。
var ReadQueue: TLockFreeWorkQueue; WriteQueue: TLockFreeWorkQueue;
最初に、TLockFreeWorkQueueの単一のインスタンスが作成され、ReadQueue変数とWriteQueue変数に書き込まれます。 このクラスは循環バッファーであり、次の説明があります。
TLockFreeWorkQueue = class public Head: Integer; Tail: Integer; Items: array[0..QueueCapacity - 1] of TObject; end;
QueueCapacityは、リングバッファーの長さを決定する定数(ゼロより大きい)です。
アイテムがキューに追加されると、ワークフローはWriteQueue.Items [Tail]要素のInterlockedExchangeComparePointerを実行します。 この場合、要素はNilと比較され、成功した場合、追加される要素がNilに書き込まれます。 操作が成功した場合、Tail値は1増加し、QueueCapacityに達すると0にリセットされます。 ワーカースレッド(ライタースレッド)のみがこの変数にアクセスできるため、Tailを自由に操作できます。 また、この後、ワークフローはメインアイテムにアイテムがキューに表示されたことを通知する必要があります。 操作が失敗した場合、これはキューがいっぱいであることを意味しますが、それについては後で説明します。
メインスレッドは、ワーカーから通知されると、キューから要素を読み取るサイクルを開始します(実際、読み取りはいつでも開始できます)。 要素を取得するために、Nil値が書き込まれるReadQueue.Items [Head]要素に対してInterlockedExchangePointerが呼び出されます。 抽出されたアイテムがNilでない場合、Head値は1増加し、QueueCapacityに達すると0にリセットされます。
それでは、バッファオーバーフローのケースに対処しましょう。 新しい要素については、新しいキューオブジェクトを作成して書き込みを続けることができます。そのため、このオブジェクトをリーダースレッドで見つけられるように、現在のキューオブジェクトにリンクを渡す必要があります。 これを行うには、追加のNextQueueフィールドをクラスに追加します。
TLockFreeWorkQueue = class public Head: Integer; Tail: Integer; Items: array[0..QueueCapacity - 1] of TObject; NextQueue: TLockFreeWorkQueue; end;
ここで、InterlockedExchangeComparePointer要素を書き込むときにNilを返す場合(キューがいっぱい)、新しいNewWriteQueueキューオブジェクトを作成します:TLockFreeWorkQueue、それに追加する要素を書き込み、InterlockedExchangePointerをWriteQueue.NextQueue変数で実行し、最後にNewWriteQueueをWriteQue変数に保存します。 したがって、この操作の後、ReadQueueおよびWriteQueueの値はすでに異なるオブジェクトを参照しています。
メインスレッドで、空のキュー処理を追加する必要があります。 ReadQueue.Items [Head]要素のInterlockedExchangePointerを読み込んでNilを返す場合、InterlockedExchangePointer(ReadQueue.NextQueue、Nil)も実行するNextQueueフィールドを追加で確認する必要があります。 Non-Nilが返された場合、オブジェクトをNewReadQueueに保存し、現在のReadQueueを削除して、この変数をNewReadQueueに設定します。
以下は、キューにアイテムを追加するための簡略化されたコードです。
procedure AddQueueItem(var WriteQueue: TLockFreeWorkQueue; Item: TObject); var NewWriteQueue: TLockFreeWorkQueue; begin if InterlockedCompareExchangePointer(WriteQueue.Items[WriteQueue.Tail]), Item, Nil) = Nil then begin
そして、キューからアイテムを取得します。
function ExtractQueueItem(var ReadQueue: TLockFreeWorkQueue): TObject; var NewReadQueue: TLockFreeWorkQueue; begin Result := Nil; repeat Result := InterlockedExchangePointer(ReadQueue.Items[ReadQueue.Head], Nil); if Result = Nil then begin
このコードでは、多少安全かもしれません。 NextQueueフィールドを使用する操作で、通常InterlockedExchangePointerを使用する必要があるかどうかはわかりませんが、直接読み取りおよび書き込みを実行しても安全な場合があります。
テストケース
簡単なコンソールの例とともに、作業コードとコームコードをネタバレの下で見ることができます。
テストケース program LockFreeTest; {$APPTYPE CONSOLE} {$R *.res} uses SysUtils, Classes, Windows, Messages;
通常の状況では、キューにアイテムが表示されると、できるだけ早くメインストリームによってアイテムを取得する必要があります。 ただし、キューのオーバーフローをテストするために、TWorkThread.FDebugReadQueueフィールドを追加しました。Falseに設定すると、メインスレッドがキューから読み取ることができなくなります(TWorkThread.Executeメソッドでは、定数TestQueueCountToFlush = 10が導入され、メインスレッドは要素を10個追加した後にのみ読み取ることができます)。
残念ながら、テストケースは単純すぎて、読み取り/書き込みユーティリティ関数内でストリームが切り替えられたときに、ストリーム間の読み取り/書き込みの衝突を生成しません。 しかし、ここでは、アルゴリズムのすべてのボトルネックをチェックすることが可能かどうか、およびコードを何に向ける必要があるかはわかりません。