䞊列凊理の䞻な抂念の分析

みんなのコヌヒヌ

明日は、ほが蚘念日ストリヌムの「Java開発者」コヌスをスムヌズに開始したす-昚幎4月以来6回連続です。 そしおこれは、私たちがあなたず共有する最も興味深い資料を再び遞択し、翻蚳したこずを意味したす。

行こう

このガむドは、マルチスレッドプログラムを䜿甚するJava開発者が、䞊行性の基本抂念ずその䜿甚方法を理解するのに圹立ちたす。 暙準ラむブラリぞのリンクを䜿甚しお、Java蚀語の重芁な偎面に぀いお孊習したす。

セクション1

゚ントリヌ

Javaは圓初から、スレッドやロックなどの䞻芁な䞊行性の抂念をサポヌトしおいたす。 このガむドは、マルチスレッドプログラムを䜿甚するJava開発者が、䞊行性の基本抂念ずその䜿甚方法を理解するのに圹立ちたす。

セクション2

コンセプトの

コンセプト説明
原子性原子性アトミック操䜜は、完党に実行されるかたったく実行されない操䜜であり、郚分的な実行は䞍可胜です。
可芖性あるスレッドが別のスレッドによっお行われた倉曎を芋る条件

衚1同時実行性の抂念



競合状態

競合状態は、同じリ゜ヌスが耇数のスレッドで同時に䜿甚される堎合に発生し、各スレッドのアクションの順序によっおは、いく぀かの結果が埗られる堎合がありたす。 以䞋のコヌドはスレッドセヌフではなく、フィヌルドを遅延初期化するcheck-then-act  nullを確認しおから初期化するはアトミックではないため、 value倉数は耇数回初期化できたす。

 class Lazy <T> { private volatile T value; T get() { if (value == null) value = initialize(); return value; } } 

デヌタ競合

デヌタの競合は、2぀以䞊のスレッドが同期せずに同じ非最終倉数にアクセスしようずしたずきに発生したす。 同期の欠劂は、他のスレッドには芋えない倉曎をもたらし、叀いデヌタの読み取りを匕き起こし、無限ルヌプ、デヌタ構造の損傷、たたは䞍正確な蚈算を匕き起こす可胜性がありたす。 このコヌドは、読み取りストリヌムがスレッドの曞き換えによる倉曎に気付かない堎合があるため、無限ルヌプに぀ながる可胜性がありたす。

 class Waiter implements Runnable { private boolean shouldFinish; void finish() { shouldFinish = true; } public void run() { long iteration = 0; while (!shouldFinish) { iteration++; } System.out.println("Finished after: " + iteration); } } class DataRace { public static void main(String[] args) throws InterruptedException { Waiter waiter = new Waiter(); Thread waiterThread = new Thread(waiter); waiterThread.start(); waiter.finish(); waiterThread.join(); } } 

セクション3

Javaメモリモデル偶然の関係

Javaメモリモデルは、フィヌルドの読み取り/曞き蟌みやモニタヌでの同期などのアクションに関しお定矩されたす。 アクションは、事前に発生する関係以前に実行されるを䜿甚しお順序付けられたす。これは、スレッドが別のスレッドのアクションの結果を確認したタむミングず、正しく同期されたプログラムを説明するために䜿甚できたす。

HAPPENS-BEFOREリレヌションズには以䞋のプロパティがありたす


むメヌゞ1では、 Action XはAction Y前に発生するため、 Thread 2では、 Action Yの右偎にあるすべおの操䜜に、 Thread 1 Action Xの巊偎にあるすべおの操䜜が衚瀺されたす。


画像1前の䟋


セクション4

暙準同期機胜

synchronizedキヌワヌド

synchronized 、異なるスレッドが同じコヌドブロックを同時に実行しないようにするために䜿甚されたす。 同期ブロックに入るこずでロックを受け取った堎合、このロックが適甚されるデヌタは排他モヌドで凊理されるため、操䜜はアトミックず芋なされたす。 たた、同じロックを受け取った埌、他のスレッドが操䜜の結果を確認できるようにしたす。

 class AtomicOperation { private int counter0; private int counter1; void increment() { synchronized (this) { counter0++; counter1++; } } } 

synchronizedキヌワヌドは、メ゜ッドレベルで展開するこずもできたす。

方法の皮類モニタヌずしお䜿甚されるリンク
静的Classオブゞェクトぞの参照<>
非静的このリンク

衚2メ゜ッド党䜓が同期されるずきに䜿甚されるモニタヌ

ロックは再入可胜であるため、スレッドにすでにロックが含たれおいる堎合は、再びロックを正垞に取埗できたす。

 class Reentrantcy { synchronized void doAll() { doFirst(); doSecond(); } synchronized void doFirst() { System.out.println("First operation is successful."); } synchronized void doSecond() { System.out.println("Second operation is successful."); } } 

競合のレベルは、モニタヌのキャプチャ方法に圱響したす。

状態説明
初期化誰も捕獲されなくなるたで䜜成されたした。
偏った戊いはなく、ロックによっお保護されたコヌドは1぀のスレッドのみによっお実行されたす。 キャプチャするのに最も安い。
薄いモニタヌは、戊いなしで耇数のスレッドによっおキャプチャされたす。 ロックには比范的安䟡なCASが䜿甚されたす。
倪い闘争がありたす。 JVMはOSミュヌテックスを芁求し、OSスケゞュヌラがスレッドパヌキングずりェむクアップを凊理できるようにしたす。

衚3モニタヌステヌタス

wait/notify

wait/notify/notifyAllは、 Objectクラスで宣蚀されおいたす。 wait 、スレッドを匷制的にWAITINGたたはTIMED_WAITINGに移行させるために䜿甚されたすタむムアりト倀が枡された堎合。 スレッドを起動するには、次のアクションのいずれかを実行できたす。


最も䞀般的な䟋は条件付きルヌプです。

 class ConditionLoop { private boolean condition; synchronized void waitForCondition() throws InterruptedException { while (!condition) { wait(); } } synchronized void satisfyCondition() { condition = true; notifyAll(); } } 


キヌワヌドvolatile

volatileは可芖性の問題を解決し、倀の倉曎をアトミックにしたす。これは、発生前の関係があるためです。volatile倉数ぞの曞き蟌みは、volatile倉数の埌続の読み取りの前に発生したす。 したがっお、次回フィヌルドが読み取られるずきに、最新のレコヌドによっお蚭定された倀が衚瀺されるようになりたす。

 class VolatileFlag implements Runnable { private volatile boolean shouldStop; public void run() { while (!shouldStop) { //do smth } System.out.println("Stopped."); } void stop() { shouldStop = true; } public static void main(String[] args) throws InterruptedException { VolatileFlag flag = new VolatileFlag(); Thread thread = new Thread(flag); thread.start(); flag.stop(); thread.join(); } } 

原子性

java.util.concurrent.atomicパッケヌゞには、 volatileように、ロックせずに単䞀の倀で耇合アトミックアクションをサポヌトするクラスのセットが含たれおいたす。

AtomicXXXクラスを䜿甚しお、アトミックcheck-then-actザントcheck-then-act操䜜を実装できたす。

 class CheckThenAct { private final AtomicReference<String> value = new AtomicReference<>(); void initialize() { if (value.compareAndSet(null, "Initialized value")) { System.out.println("Initialized only once."); } } } 

AtomicIntegerずAtomicLong䞡方にアトミックなむンクリメント/デクリメント操䜜がありたす

 class Increment { private final AtomicInteger state = new AtomicInteger(); void advance() { int oldState = state.getAndIncrement(); System.out.println("Advanced: '" + oldState + "' -> '" + (oldState + 1) + "'."); } } 

カりンタヌが必芁で、アトミックに倀を取埗する必芁がない堎合は、 AtomicLong/AtomicInteger代わりにAtomicLong/AtomicInteger䜿甚を怜蚎しおAtomicLong/AtomicInteger 。 LongAdderは、いく぀かのセルの倀を凊理し、必芁に応じおその数を増やしたす。したがっお、競争が激しい堎合に効果的です。

ThreadLocal

デヌタをストリヌムに保存し、ロックをオプションにする1぀の方法は、 ThreadLocalストレヌゞを䜿甚するこずです。 抂念的に、 ThreadLocalは、各スレッドが独自のバヌゞョンの倉数を持っおいるかのように機胜したす。 ThreadLocal通垞、「珟圚のトランザクション」などの各スレッドの倀やその他のリ゜ヌスをキャプチャするために䜿甚されたす。 さらに、フロヌカりンタ、統蚈、たたは識別子ゞェネレヌタを維持するために䜿甚されたす。

 class TransactionManager { private final ThreadLocal<Transaction> currentTransaction = ThreadLocal.withInitial(NullTransaction::new); Transaction currentTransaction() { Transaction current = currentTransaction.get(); if (current.isNull()) { current = new TransactionImpl(); currentTransaction.set(current); } return current; } } 

セクション5

安党な公開

オブゞェクトを公開するず、珟圚のスコヌプ倖でリンクが利甚可胜になりたすたずえば、ゲッタヌからリンクを返す。 オブゞェクトを安党に公開するには完党に䜜成された堎合のみ、同期が必芁になる堎合がありたす。 パブリケヌションのセキュリティは、次を䜿甚しお実珟できたす。


 class StaticInitializer { //       public static final Year year = Year.of(2017); public static final Set<String> keywords; //        static { //    Set<String> keywordsSet = new HashSet<>(); //   keywordsSet.add("java"); keywordsSet.add("concurrency"); //    keywords = Collections.unmodifiableSet(keywordsSet); } } 


 class Volatile { private volatile String state; void setState(String state) { this.state = state; } String getState() { return state; } } 


 class Atomics { private final AtomicInteger state = new AtomicInteger(); void initializeState(int state) { this.state.compareAndSet(0, state); } int getState() { return state.get(); } } 


 class Final { private final String state; Final(String state) { this.state = state; } String getState() { return state; } } 

このリンクが䜜成䞭に蒞発しないこずを確認しおください。

 class ThisEscapes { private final String name; ThisEscapes(String name) { Cache.putIntoCache(this); this.name = name; } String getName() { return name; } } class Cache { private static final Map<String, ThisEscapes> CACHE = new ConcurrentHashMap<>(); static void putIntoCache(ThisEscapes thisEscapes) { // 'this'   ,    . CACHE.putIfAbsent(thisEscapes.getName(), thisEscapes); } } 


 class Synchronization { private String state; synchronized String getState() { if (state == null) state = "Initial"; return state; } } 

セクション6

䞍倉オブゞェクト

䞍倉オブゞェクトの最も泚目すべき特性の1぀はスレッドの安党性です 。したがっお、オブゞェクトの同期は䞍芁です。 䞍倉オブゞェクトの芁件


䞍倉オブゞェクトの䟋

 //   final -   public final class Artist { //  ,  final private final String name; //   , final  private final List<Track> tracks; public Artist(String name, List<Track> tracks) { this.name = name; //   List<Track> copy = new ArrayList<>(tracks); //      this.tracks = Collections.unmodifiableList(copy); // 'this'       } // Getters, equals, hashCode, toString } //  final -   public final class Track { // ,  final private final String title; public Track(String title) { this.title = title; } // Getters, equals, hashCode, toString } 

セクション7

ストリヌム

java.lang.Threadクラスは、アプリケヌションたたはJVMスレッドを衚すために䜿甚されたす。 コヌドは、垞にいく぀かのThreadクラスのコンテキストで実行されたす珟圚のスレッドを取埗するには、 Thread#currentThread()).䜿甚できたすThread#currentThread()).

状態説明
新品開始したせんでした。
実行可胜皌働しおいたす。
ブロックされたモニタヌで埅機䞭-圌はロックを取埗しお、クリティカルセクションに入ろうずしおいたす。
埅っおいたす特定のアクションが別のスレッドによっお実行されるのを埅っおいたすnotify / notifyAll、LockSupportunpark。
TIMED_WAITINGWAITINGず同じですが、タむムアりトがありたす。
終了したした止たった

衚4ストリヌムの状態

ストリヌム方匏説明
始めるThreadクラスのむンスタンスを開始し、runメ゜ッドを実行したす。
参加するストリヌムの終わりたでブロックしたす。
割り蟌むストリヌムを䞭断したす。 割り蟌みに応答するメ゜ッドでスレッドがブロックされおいる堎合、別のスレッドでInterruptedExceptionがスロヌされたす。そうでない堎合は、割り蟌みステヌタスが蚭定されたす。
停止、䞀時停止、再開、砎棄これらの方法はすべお時代遅れです。 問題のフロヌの状態に応じお、危険な操䜜を実行したす。 代わりに、Threadinterruptたたはvolatileフラグを䜿甚しお、スレッドに䜕をすべきかを䌝えたす

衚5スレッド調敎方法スレッド調敎方法

InterruptedExceptionを凊理する方法は


予期しない䟋倖の凊理

UncaughtExceptionHandlerでUncaughtExceptionHandlerが瀺される堎合があり、スレッドが䞭断されるため、キャッチされない䟋倖の通知を受け取りたす。

 Thread thread = new Thread(runnable); thread.setUncaughtExceptionHandler((failedThread, exception) -> { logger.error("Caught unexpected exception in thread '{}'.", failedThread.getName(), exception); }); thread.start(); 

セクション8

掻力掻力

Deadlock

Deadlock 、たたはデッドロックは、耇数のスレッドがあり、それぞれが別のスレッドに属するリ゜ヌスを予期しおいる堎合に発生したす。そのため、リ゜ヌスずそれらを埅機しおいるスレッドからルヌプが圢成されたす。 最も明癜なタむプのリ゜ヌスはオブゞェクトモニタヌですが、ロックを匕き起こすリ゜ヌスたずえば、 wait/notify も適切です。

朜圚的なデッドロックの䟋

 class Account { private long amount; void plus(long amount) { this.amount += amount; } void minus(long amount) { if (this.amount < amount) throw new IllegalArgumentException(); else this.amount -= amount; } static void transferWithDeadlock(long amount, Account first, Account second){ synchronized (first) { synchronized (second) { first.minus(amount); second.plus(amount); } } } } 

同時に次の堎合、盞互ロックが発生したす。


デッドロックを防ぐ方法


 class Account { private long id; private long amount; //    static void transferWithLockOrdering(long amount, Account first, Account second){ boolean lockOnFirstAccountFirst = first.id < second.id; Account firstLock = lockOnFirstAccountFirst ? first : second; Account secondLock = lockOnFirstAccountFirst ? second : first; synchronized (firstLock) { synchronized (secondLock) { first.minus(amount); second.plus(amount); } } } } 


 class Account { private long amount; //    static void transferWithTimeout( long amount, Account first, Account second, int retries, long timeoutMillis ) throws InterruptedException { for (int attempt = 0; attempt < retries; attempt++) { if (first.lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS)) { try { if (second.lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS)) { try { first.minus(amount); second.plus(amount); } finally { second.lock.unlock(); } } } finally { first.lock.unlock(); } } } } } 

JVMはモニタヌの盞互ロックを怜出し、モニタヌに関する情報をストリヌムダンプに衚瀺できたす。

LivelockおよびStream Fasting

ラむブロックは、スレッドがリ゜ヌスぞのアクセスのネゎシ゚ヌションに党時間を費やすか、スレッドが実際に前進しないようにデッドロックを発芋しお回避するずきに発生したす。 絶食は、スレッドが長期間ブロックし続けるず発生するため、䞀郚のスレッドは進行せずに「star死」したす。

セクション9

java.util.concurrent

スレッドプヌル

スレッドプヌルのメむンむンタヌフェむスはExecutorService.java.util.concurrentたた、最も䞀般的な構成でスレッドプヌルを䜜成するためのファクトリメ゜ッドを含む静的Executorsファクトリも提䟛したす。

方法説明
newSingleThreadExecutor1぀のスレッドのみを持぀ExecutorServiceを返したす。
newFixedThreadPoolスレッドの数が固定されたExecutorServiceを返したす。
newCachedThreadPoolさたざたなサむズのスレッドのプヌルを持぀ExecutorServiceを返したす。
newSingleThreadScheduledExecutor単䞀のスレッドでScheduledExecutorServiceを返したす。
newScheduledThreadPoolスレッドのメむンセットを含むScheduledExecutorServiceを返したす。
newWorkStealingPoolキャッシングタスクExecutorServiceを返したす。

衚6静的ファクトリメ゜ッド

スレッドプヌルのサむズを決定するずき、アプリケヌションが実行されおいるマシンの論理コアの数のサむズを決定するこずはしばしば有甚です。 Runtime.getRuntime().AvailableProcessors()呌び出すこずにより、Javaでこの倀を取埗できたす。

実装説明
ThreadPoolExecutorデフォルトの実装では、サむズ倉曎スレッドプヌル、1぀の䜜業キュヌ、拒吊されたタスクRejectedExecutionHandler経由およびスレッド䜜成ThreadFactory経由のカスタムポリシヌが䜿甚されたす。
ScheduledThreadPoolExecutor定期的なタスクをスケゞュヌルする機胜を提䟛する拡匵ThreadPoolExecutor。
フォヌクゞョむンプヌルタスクを盗むプヌルプヌル内のすべおのスレッドは、割り圓おられたタスクたたは他のアクティブなタスクによっお䜜成されたタスクを芋぀けお実行しようずしたす。

衚7スレッドプヌルの実装

タスクは、 ExecutorService#submit 、 ExecutorService#invokeAllたたはExecutorService#invokeAnyを䜿甚しお送信されたす。これらには、さたざたなタむプのタスク甚のオヌバヌロヌドがいく぀かありたす。

むンタヌフェヌス説明
実行可胜戻り倀のないタスクを衚したす。
呌び出し可胜戻り倀を持぀蚈算を衚したす。 たた、元の䟋倖をスロヌするため、チェック枈み䟋倖のラッパヌは必芁ありたせん。

衚8機胜タスクむンタヌフェむス

Future

Futureは非同期コンピュヌティングの抜象化です。 蚈算結果たたは䟋倖のいずれかで、い぀でも利甚可胜な蚈算の結果を衚したす。 ほずんどのExecutorServiceメ゜ッドは、 Futureを戻り型ずしお䜿甚したす。 将来の珟圚の状態を調べるためのメ゜ッドを提䟛するか、結果が利甚可胜になるたでブロックしたす。

 ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<String> future = executorService.submit(() -> "result"); try { String result = future.get(1L, TimeUnit.SECONDS); System.out.println("Result is '" + result + "'."); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e.getCause()); } catch (TimeoutException e) { throw new RuntimeException(e); } assert future.isDone(); 

ロック

Lock

java.util.concurrent.locksパッケヌゞには、暙準のLockむンタヌフェむスがありたす。 ReentrantLock実装は、synchronizedキヌワヌドの機胜を耇補したすが、ロックステヌタス情報の取埗、非ブロッキングtryLock()ロックの䞭断などの远加機胜も提䟛したす。 ReentrantLockの明瀺的なむンスタンスの䜿甚䟋

 class Counter { private final Lock lock = new ReentrantLock(); private int value; int increment() { lock.lock(); try { return ++value; } finally { lock.unlock(); } } } 

ReadWriteLock

java.util.concurrent.locksパッケヌゞには、ReadWriteLockむンタヌフェむスおよびReentrantReadWriteLock実装も含たれおいたす。これは、読み取りず曞き蟌みのロックのペアによっお決定され、通垞は耇数のリヌダヌが同時に読み取りを蚱可したすが、ラむタヌは1人のみ蚱可したす。

 class Statistic { private final ReadWriteLock lock = new ReentrantReadWriteLock(); private int value; void increment() { lock.writeLock().lock(); try { value++; } finally { lock.writeLock().unlock(); } } int current() { lock.readLock().lock(); try { return value; } finally { lock.readLock().unlock(); } } } 

CountDownLatch

CountDownLatch . await() , , 0. ( ) countDown() , . , 0. , .

CompletableFuture

CompletableFuture . Future, — , , , . ( CompletableFuture#supplyAsync/runAsync ), ( *async ) , ( ForkJoinPool#commonPool ).

, CompletableFuture , , *async , .

future , CompletableFuture#allOf , future , , future , CompletableFuture#anyOf , , - future .

 ExecutorService executor0 = Executors.newWorkStealingPool(); ExecutorService executor1 = Executors.newWorkStealingPool(); //,   future  CompletableFuture<String> waitingForAll = CompletableFuture .allOf( CompletableFuture.supplyAsync(() -> "first"), CompletableFuture.supplyAsync(() -> "second", executor1) ) .thenApply(ignored -> " is completed."); CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Concurrency Refcard", executor0) //    .thenApply(result -> "Java " + result) //   .thenApplyAsync(result -> "Dzone " + result, executor1) //,     future  .thenCombine(waitingForAll, (first, second) -> first + second) //  ForkJoinPool#commonPool   .thenAcceptAsync(result -> { System.out.println("Result is '" + result + "'."); }) //  .whenComplete((ignored, exception) -> { if (exception != null) exception.printStackTrace(); }); //   - ,     . future.join(); future //    (  ). .thenRun(() -> System.out.println("Current thread is '" + Thread.currentThread().getName() + "'.")) //  ForkJoinPool#commonPool   .thenRunAsync(() -> System.out.println("Current thread is '" + Thread.currentThread().getName() + "'.")); 



— Collections#synchronized* . , java.util.concurrent , .



実装説明
CopyOnWriteArrayList, ( , ). .

9: java.util.concurrent

説明
ConcurrentHashMap-. , , . CAS- ( ), ( ).
ConcurrentSkipListMapMap, TreeMap. TreeMap, , .

10: java.util.concurrent



説明
CopyOnWriteArraySetCopyOnWriteArrayList, copy-on-write Set.
ConcurrentSkipListSetConcurrentSkipListMap, Set.

11: java.util.concurrent

Map:

 Set<T> concurrentSet = Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>()); 



«» «». « , » (FIFO). BlockingQueue Queue , , , ( ) ( ). BlockingQueue , , , - .

実装説明
ConcurrentLinkedQueue, .
LinkedBlockingQueue, .
PriorityBlockingQueue, . , Comparator, ( FIFO).
DelayQueue, . , .
SynchronousQueue-, , . , . .

12: java.util.concurrent

終わり

.

ありがずう

Source: https://habr.com/ru/post/J353414/


All Articles