CompletableFutureを複数のスレッドで使用する問題とその解決策

画像 Java 8ではCompletableFuture新しいクラスが導入さCompletableFuture 、これにより非同期コードを簡単に記述できます。
複数のスレッドからCompletableFutureを使用するとき、その非自明な動作に遭遇しました。つまり、そのスレッドでのコールバックは、それらのスレッドではまったく期待どおりに実行できないということです。 このことと、どうやって問題を解決したかについて-この記事で説明します。

スレッドセーフなデータ構造を使用するサーバーに対して、非同期の非ブロッキングシングルスレッドクライアントを開発しました。 テストは問題なく合格しましたが、ベンチマークは、シングルスレッドクライアントの内部構造でConcurrentModificationExceptionが発生することがありました。

クライアントの非同期はCompletableFutureを使用して実装され、クライアント内のすべての操作は1つのスレッド(以降、コードsingleThreadExecutor )で実行されました。

ユーザーが使用できるgetメソッドを使用したクライアントコードのフラグメント:

 //   private final Set<CompletableFuture> pendingFutures = Collections.newSetFromMap(new IdentityHashMap<>()); public CompletableFuture<String> get(String key) { CompletableFuture<String> future = new CompletableFuture<>(); //       singleThreadExecutor.execute(() -> { // future     pendingFutures.add(future); future.whenComplete((v, e) -> { // future       pendingFutures.remove(future); }); //            //     future.complete(data);    singleThreadExecutor }); return future; } 

これは行われるべきではないことが判明しました。

CompletableFutureのjavadocを注意深く読んでいれば、おそらくこれについては以前に知っていただろう。

javadocを表示
非非同期メソッドの依存する完了のために提供されるアクションは、現在のCompletableFutureを完了するスレッドによって、または完了メソッドのその他の呼び出し元によって実行されます。

このアーキテクチャを使用する場合、 CompletableFutureすべてのコールバックは、 CompletableFutureと同じスレッドで呼び出される必要があります。

上記のコードによると、それは起こっているようです。 ただし、ベンチマークは、同じクライアントスレッド( singleThreadExecutor )のpendingFuturespendingFuturesするコードのConcurrentModificationExceptionで終了する場合がありました。

実際、 future.whenCompletefuture.whenCompleteを呼び出す)に渡されるコールバックは、完全に異なるスレッドで実行される場合があります。 むしろ、私のクライアントが使用するアプリケーションのスレッドで:

 Client client = new Client("127.0.0.1", 8080); CompletableFuture<String> result = client.get(key); result.thenAccept(data -> { System.out.println(data); }); 

このアプリケーションでresult.thenAcceptを呼び出すと、クライアントコード自体の内部に追加された将来のコールバックの残りを呼び出すことがあります。

問題を簡単な例で見てみましょう。


 Thread mainThread = Thread.currentThread(); CompletableFuture<Void> future = new CompletableFuture<>(); future.thenRun(() -> { System.out.println(Thread.currentThread() == mainThread); }); future.complete(null); 

コールバックはcompleteメソッドと同じスレッドで実行されるため、このようなコードは常にtrueを表示しtrue

ただし、他のスレッドからCompletableFutureへの呼び出しが少なくとも1つある場合、動作が変わる可能性があります。

 //  Thread mainThread = Thread.currentThread(); //   Executor executor = Executors.newSingleThreadExecutor(); CompletableFuture<Void> future = new CompletableFuture<>(); future.thenRun(() -> { System.out.println(Thread.currentThread() == mainThread) }); //  callback    future    executor.execute(() -> { future.thenRun(() -> { //nop }); }); // future future.complete(null); 

そのようなコードは時々 false返すことがありfalse

実際には、同じ未来でthenRunを呼び出しthenRunが、2番目のスレッドで、最初のthenRunコールバックが発生する可能性があります。 この場合、最初のthenRunコールバックは2番目のスレッドで呼び出されます。

これは、 future.complete(null)が実行を開始した時点で発生しますが、まだコールバックを呼び出すことができず、 thenRun 2番目のスレッドでthenRunれました。

問題は簡単に解決されます。

 //  Thread mainThread = Thread.currentThread(); //   Executor executor = Executors.newSingleThreadExecutor(); CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> secondThreadFuture = future.thenRun(() -> { System.out.println(Thread.currentThread() == mainThread); }); //  callback    future    executor.execute(() -> { secondThreadFuture.thenRun(() -> { //nop }); }); // future future.complete(null); 

最初のfutureの結果に依存するsecondThreadFutureを追加しました。 そして、2番目のスレッドでthenRunを呼び出しても、元のfutureでコールバックがトリガーされる可能性はありません。

ユーザー定義のスレッドでのコールバックの呼び出しを保証するために、 CompletableFutureは、 thenRunAsyncに渡す必要のあるthenRunAsyncなどの非同期メソッドの実装があります。 ただし、メソッドの非同期バージョンは、通常のバージョンよりも実行速度が遅くなる場合があります。 したがって、私はそれらを再び使用したくありませんでした。

おわりに


私が自分でCompletableFuture結論:複数のスレッドで1つのCompletableFutureオブジェクトを使用しないでCompletableFutureオブジェクトのすべてのコールバックが特定のスレッドで実行されることを確認する必要がある場合。 また、1つのCompletableFutureで複数のストリームを使用する必要がある場合、元のCompletableFutureではなく、元のCompletableFutureに依存する新しいストリームに転送するだけで十分です。 たとえば、次のように:

 CompletableFuture<Void> secondThreadFuture = firstThreadFuture.whenComplete((v, e) -> { //nop }); 

Source: https://habr.com/ru/post/J325730/


All Articles