現在Javaに存在するメモリモデルは、このコードにスレッドレースがない場合に、マルチスレッドコードの予想される実行順序を保証します。 また、コードを競合から保護するために、コード間でデータを同期および交換するさまざまな方法が考案されました。
HotSpot JDKに含まれている
java.util.concurrent
パッケージは、マルチスレッドコードを記述するための次のツールを提供します。
- アトミック
- ロック
- コレクション
- 同期ポイント
- 執行者
- アキュムレータ_jdk 1.8_
アトミック
java.util.concurrent.atomic
子パッケージには、プリミティブ型のアトミックな作業のためのクラスのセットが含まれています。 これらのクラスのコントラクトは、「1プロセッサ時間単位」の
compare-and-set
操作のパフォーマンスを保証します。 この変数に新しい値を設定する場合、古い値も渡します(楽観的ロックのアプローチ)。 メソッドが呼び出された時点から、変数の値が予想された値と異なる場合、実行結果は
false
になり
false
。
たとえば、2つの
long
変数の配列
[1,2,3,4,5]
および
[-1,-2,-3,-4,-5]
ます。 各スレッドは配列に対して順次反復され、要素を1つの変数にまとめます。 悲観的なロックを使用したコード(
groovy )は次のようになります。
class Sum { static monitor = new Object() static volatile long sum = 0 } class Summer implements Callable { long[] data Object call() throws Exception { data.each { synchronized (Sum.monitor) { println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}") Sum.sum += it } } } } Executors.newFixedThreadPool(2).invokeAll([ new Summer(data: [1,2,3,4,5]), new Summer(data: [-1,-2,-3,-4,-5]) ]) print("Sum: ${Sum.sum}")
実行結果が期待されます。
pool-1-thread-1: add 1 to 0 pool-1-thread-2: add -1 to 1 pool-1-thread-1: add 2 to 0 pool-1-thread-2: add -2 to 2 pool-1-thread-1: add 3 to 0 pool-1-thread-2: add -3 to 3 pool-1-thread-1: add 4 to 0 pool-1-thread-1: add 5 to 4 pool-1-thread-2: add -4 to 9 pool-1-thread-2: add -5 to 5 Sum: 0
ただし、このアプローチにはパフォーマンス上の重大な欠点があります。 この場合、役に立たない作業が増えると、役に立つよりも多くのリソースが必要になります。
- モニターをロックしようとする
- フローブロッキング
- モニターのロック解除
- スレッドロック解除
同じ量を計算する場合、
AtomicLong
を使用して楽観的ロックを実装することを検討してください。
class Sum { static volatile AtomicLong sum = new AtomicLong(0) } class Summer implements Callable { long[] data Object call() throws Exception { data.each { while(true) { long localSum = Sum.sum.get() if (Sum.sum.compareAndSet(localSum, localSum + it)) { println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}") break; } else { println("[MISS!] ${Thread.currentThread().name}: add ${it} to ${Sum.sum}") } } } } } Executors.newFixedThreadPool(2).invokeAll([ new Summer(data: [1,2,3,4,5]), new Summer(data: [-1,-2,-3,-4,-5]) ]) print("Sum: ${Sum.sum}")
「誤った」試みの結果からわかるように、それほど多くはありませんでした。
[MISS!] pool-1-thread-1: add 1 to -1 pool-1-thread-2: add -1 to -1 pool-1-thread-2: add -2 to -3 [MISS!] pool-1-thread-1: add 1 to -3 pool-1-thread-2: add -3 to -6 pool-1-thread-1: add 1 to -5 [MISS!] pool-1-thread-2: add -4 to -5 pool-1-thread-1: add 2 to -7 pool-1-thread-2: add -4 to -7 pool-1-thread-1: add 3 to -9 pool-1-thread-2: add -5 to -9 pool-1-thread-1: add 4 to -5 pool-1-thread-1: add 5 to 0 Sum: 0
楽観的ロックを使用することを決定する場合、変更される変数を使用したアクションにあまり時間がかからないことが重要です。 このアクションが長ければ長いほど、誤った
compare-and-set
が頻繁に発生し、このアクションを再度実行しなければならない頻度が高くなります。
compare-and-set
に基づいて、非ブロッキング読み取りロックも実装できます。 この場合、アトミック変数には、処理中のオブジェクトのバージョンが格納されます。 計算前にバージョン値を受け取ったので、計算後に検証できます。 通常の
read-write
ロックは、バージョン検証が失敗した場合にのみ有効になります。
class Transaction { long debit } class Account { AtomicLong version = new AtomicLong() ReadWriteLock readWriteLock = new ReentrantReadWriteLock() List<Transaction> transactions = new ArrayList<Transaction>() } long balance(Account account) { ReentrantReadWriteLock.ReadLock locked while(true) { long balance = 0 long version = account.version.get() account.transactions.each {balance += it.debit}
ロック
再入可能ロック
同期ロックとは異なり、
ReentrantLock
使用
ReentrantLock
と、ロックの取り外しと受け取りの
ReentrantLock
をより柔軟に選択できます。 通常のJava呼び出しを使用します。 また、
ReentrantLock
使用すると、ロックの現在の状態に関する情報を取得したり、一定時間ロックを「待機」したりできます。 単一のスレッドのロックの正しい再帰的な取得と解放をサポートします。 正直なロックが必要な場合(モニターのキャプチャ時に順序を維持する)
ReentrantLock
もこのメカニズムが備わっています。
ReentrantLock
ロックと
ReentrantLock
ロックは非常に似ているという事実にもかかわらず、JVMレベルでの実装はまったく異なります。
JMMの詳細に入らない場合:JVMが提供する同期ロックの代わりに
ReentrantLock
を使用することは、モニターのスレッドの戦いが頻繁にある場合にのみ価値があります。 1つのスレッドのみが同期メソッドにアクセスする場合、ReentrantLockパフォーマンス
ReentrantLock
JVMロックメカニズムよりも劣ります。
ReentrantReadWriteLock
ReentrantLock
プロパティ
ReentrantLock
補完
ReentrantLock
多くの読み取りおよび書き込みロックをキャプチャ
ReentrantLock
機能を追加します。 必要に応じて、読み取りロックの前に書き込みロックを「下げる」ことができます。
StampedLock _jdk 1.8_
楽観的および悲観的読み取り/書き込みロックを実装し、さらに増加または減少する可能性があります。 楽観的ロックは、ロックの「スタンプ」(
javadoc )によって実装されます。
double distanceFromOriginV1() {
コレクション
ArrayBlockingQueue
あるスレッドから別のスレッドにメッセージを送信するための正直なキュー。 ブロック(
put()
take()
)および非ブロック(
offer()
pool()
)メソッドをサポートします。 null値を禁止します。 キューの容量は、作成時に指定する必要があります。
並行ハッシュマップ
hash
関数に基づくキーと値の構造。 読み取りロックはありません。 記録時には、カードの一部(セグメント)のみがブロックされます。 セグメントの数は、
concurrencyLevel
最も近い次数2に制限され
concurrencyLevel
。
ConcurrentSkipListMap
バランスの取れたマルチスレッドのキーと値の構造(O(log n))。 検索はスキップされたリストに基づいています。 カードはキーを比較できる必要があります。
ConcurrentSkipListSet
値のない
ConcurrentSkipListMap
。
CopyOnWriteArrayList
書き込みブロック、非読み取りブロックのリスト。 変更すると、メモリ内に配列の新しいインスタンスが作成されます。
CopyOnWriteArraySet
値なしの
CopyOnWriteArrayList
。
遅延キュー
PriorityBlockingQueue
、特定の遅延(オブジェクトの
Delayed
インターフェイスを介して宣言された遅延)の後にのみ要素を受信できるようにします。
DelayQueue
を使用して、スケジューラを実装できます。 キューの容量は固定されていません。
LinkedBlockingDeque
接続性に基づいた双方向
BlockingQueue
(キャッシュミスとキャッシュコヒーレンスのオーバーヘッド)。 キューの容量は固定されていません。
LinkedBlockingQueue
接続性に基づく単方向
BlockingQueue
(キャッシュミスとキャッシュコヒーレンスのオーバーヘッド)。 キューの容量は固定されていません。
LinkedTransferQueue
接続性に基づく単方向の `BlockingQueue`(キャッシュミスとキャッシュコヒーレンスのオーバーヘッド)。 キューの容量は固定されていません。 このキューにより、ハンドラーが要素を取得するのを待つことができます。
PriorityBlockingQueue
(要素の比較を通じて)メッセージに優先順位を付けることができる、単方向の `BlockingQueue`。 null値を禁止します。
SynchronousQueue
put()
メソッドの
transfer()
ロジックを実装する単方向の `BlockingQueue`。
同期ポイント
CountDownLatch
countDown()
への特定の(またはそれ以上の)呼び出し回数を期待するバリア(
await()
countDown()
。 バリアの状態はリセットできません。
CyclicBarrier
他のスレッドによる特定の数の
await()
呼び出しを期待するバリア(
await()
)。 スレッドの数が指定に達すると、オプションのコールバックが呼び出され、ロックが解除されます。 バリアは、保留中のフローが解放されて再利用できるようになると、状態を初期状態にリセットします。
交換機
2つのスレッドを同期するためのバリア( `exchange()`)。 同期時には、スレッド間でオブジェクトを揮発的に転送できます。
フェイザー
拡張機能「CyclicBarrier」。バリアの各サイクルの参加者を登録および削除できます。
セマフォ
指定された数のスレッドのみがモニターをキャプチャできるようにするバリア。 基本的に、「ロック」の機能をブロック内の機能を複数のスレッドに拡張します。
執行者
ExecutorService
は
new Thread(runnable)
を置き換えて、スレッドでの作業を簡素化しました。
ExecutorService
は、解放されたスレッドを再利用し、スレッドプールのタスクからキューを編成し、タスクの結果をサブスクライブするのに役立ちます。
Runnable
インターフェースの代わりに、プールは
Callable
インターフェースを使用します(結果を返し、エラーをスローできます)。
ExecutorService pool = Executors.newFixedThreadPool(4) Future future = pool.submit(new Callable() { Object call() throws Exception { println("In thread") return "From thread" } }) println("From main") println(future.get()) try { pool.submit(new Callable() { Object call() throws Exception { throw new IllegalStateException() } }).get() } catch (ExecutionException e) {println("Got it: ${e.cause}")} pool.shutdown()
invokeAll
メソッドは、すべてのタスクの完了時にのみ呼び出しスレッドに制御を渡します。
invokeAny
メソッドは、最初に正常に完了したタスクの結果を返し、後続のタスクをすべてキャンセルします。
ThreadPoolExecutor
プール内の作業スレッドとスレッドの最大数、タスクのキューを指定する機能を備えたスレッドのプール。
ScheduledThreadPoolExecutor
ThreadPoolExecutor
の機能を拡張し、遅延または定期的にタスクを実行します。
ThreadPoolExecutor
自己複製タスク用の軽いスレッドプール。 プールは、親の子タスクの `fork()`および `join()`メソッドの呼び出しを期待しています。
class LNode { List<LNode> childs = [] def object } class Finder extends RecursiveTask<LNode> { LNode node Object expect protected LNode compute() { if (node?.object?.equals(expect)) { return node } node?.childs?.collect { new Finder(node: it, expect: expect).fork() }?.collect { it.join() }?.find { it != null } } } ForkJoinPool es = new ForkJoinPool() def invoke = es.invoke(new Finder( node: new LNode( childs: [ new LNode(object: "ivalid"), new LNode( object: "ivalid", childs: [new LNode(object: "test")] ) ] ), expect: "test" )) print("${invoke?.object}")
アキュムレータ_jdk 1.8_
バッテリーを使用すると、CASを使用せずにマルチスレッド環境で数値要素に対してプリミティブ操作(合計/最大値の検索)を実行できます。