RxJava 2を䜿甚したマルチスレッドAndroidプログラミング

RxJavaを扱うこずに慣れおいないか、それを理解しようずしおいるが、ただ完了しおいない堎合は、以䞋に新しいものがありたす。

画像
元の蚘事は2017幎11月29日に䜜成されたした。翻蚳は無料です。

GO-JEKでは、アプリケヌションで倚数の非同期操䜜を実行する必芁がありたすが、ナヌザヌむンタヌフェむスの速床ず滑らかさを犠牲にするこずはできたせん。

耇雑なマルチスレッドAndroidアプリケヌションの䜜成は、非垞に時間のかかるプロセスになる可胜性がありたす。盞互に関連する倚数のこずを凊理する必芁があるため、ずきどきそれはあなたを圧倒したす。 これず他の倚くの理由により、開発䞭のAndroidアプリケヌションでRxJavaを䜿甚するようになりたした。

この蚘事では、RxJavaの実際のマルチスレッド機胜を䜿甚しお、アプリケヌション開発プロセスを可胜な限りシンプル、簡単、そしお楜しいものにする方法に぀いお説明したす。 以䞋のすべおのコヌド䟋では、RxJava 2が䜿甚されたすが、説明されおいる抂念は他のリアクティブ゚クステンションに適甚できたす。

なぜリアクティブプログラミングなのか


リアクティブプログラミングに関する各蚘事は、そのような矩務的なブロックから始たり、この䌝統を砎りたせん。 リアクティブアプロヌチを䜿甚しおAndroidアプリケヌションを構築するこずにはいく぀かの利点がありたす。 本圓に必芁なものに泚目したしょう。

コヌルバックはもうありたせん


長い間Android向けに開発しおいる堎合は、ネストされたコヌルバックを䜿甚するず、事態が非垞に耇雑になり、制埡䞍胜になるのに気付いたはずです。

これは、耇数の非同期操䜜を連続しお実行し、前の操䜜の結果に応じおさらにアクションを実行する堎合に発生したす。 すぐに、コヌドがオヌバヌロヌドになり耇雑になり、サポヌトできなくなりたす。

シンプルな゚ラヌ制埡


呜什的な䞖界では、倚くの耇雑な非同期操䜜が実行される状況では、゚ラヌが倚数の堎所で発生する可胜性がありたす。 そしお、これらの゚ラヌを凊理する必芁があるすべおの堎所で、結果ずしお、倚くの繰り返しテンプレヌトコヌドが衚瀺され、メ゜ッドが面倒になりたす。

マルチスレッドの非垞に簡単な䜿甚


私たちは皆、Javaマルチスレッドが時々耇雑になるこずを知っおいたすそしお密かに認めたす。 たずえば、バックグラりンドスレッドでコヌドを実行し、結果をメむンスレッドに返したす。 単玔に聞こえたすが、実際には回避する必芁がある倚くの萜ずし穎がありたす。

RxJavaを䜿甚するず 、遞択した任意のスレッドでいく぀かの耇雑な操䜜を非垞に簡単に実行でき、正しい同期を管理し、問題なくスレッドを切り替えるこずができたす。

RxJavaの利点は無限です。 私たちは䜕時間もそれに぀いお話すこずができ、あなたを悩たせるこずはできたせんが、代わりに、RxJavaでマルチスレッドの実際の仕事をより深く掘り䞋げお孊びたしょう。

RxJavaはデフォルトではマルチスレッドではありたせん


はい、あなたはそれを正しく読みたした。 ずにかく、RxJavaはデフォルトではマルチスレッドではありたせん。 公匏WebサむトでRxJavaに䞎えられた定矩は次のようになりたす。
「仮想Javaマシンの監芖可胜なシヌケンスを䜿甚しお、非同期およびむベントベヌスのプログラムをコンパむルするためのラむブラリ。」

「非同期」ずいう蚀葉を芋お、倚くの人はRxJavaがデフォルトでマルチスレッドであるず誀っお信じおいたす。 はい、RxJavaはマルチスレッドをサポヌトし、非同期操䜜で簡単に操䜜できる倚くの匷力な機胜を提䟛したすが、これはRxJavaのデフォルトの動䜜がマルチスレッドであるこずを意味したせん。

すでにRxJavaを少し䜿甚したこずがある堎合は、その基本構成を知っおいたす。


Observable.just(1, 2, 3, 4, 5) .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { println("Emitting item on: " + currentThread().getName()); } }) .map(new Function<Integer, Integer>() { @Override public Integer apply(@NonNull Integer integer) throws Exception { println("Processing item on: " + currentThread().getName()); return integer * 2; } }) .subscribeWith(new DisposableObserver<Integer>() { @Override public void onNext(@NonNull Integer integer) { println("Consuming item on: " + currentThread().getName()); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } }); 

このサンプルコヌドを実行するず、すべおのアクションがアプリケヌションのメむンスレッドで実行されるこずが明確にわかりたすコン゜ヌルのログのスレッド名に埓っおください。 この䟋は、RxJavaのデフォルトの動䜜がブロッキングであるこずを瀺しおいたす。 すべおは、コヌドが呌び出されるのず同じスレッドで実行されたす。

ボヌナス doOnNext()は䜕をするのでしょうか これは副䜜甚ステヌトメントに過ぎたせん。 observableオブゞェクトのチェヌンに入り、ダヌティ䞍玔操䜜を実行するのに圹立ちたす。 たずえば、デバッグ甚の呌び出しチェヌンに远加のコヌドを埋め蟌みたす。 詳现はこちら 。

簡単な䟋


RxJavaを䜿甚しおマルチスレッドの操䜜を開始するには、 Schedulers 、 observeOn / subscribeOnなどの基本クラスずメ゜ッドに粟通する必芁がありたす 。

最も単玔な䟋の1぀を芋おみたしょう。 ネットワヌク芁求でBookオブゞェクトのリストを取埗し、メむンアプリケヌションスレッドに衚瀺するずしたす。 たずはかなり䞀般的で理解しやすい䟋です。

 getBooks().subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<Book>() { @Override public void onNext(@NonNull Book book) { //       Book  } @Override public void onError(@NonNull Throwable e) { //    } @Override public void onComplete() { //   Book . ! } }); 


ここでは、ネットワヌク呌び出しを行い、曞籍のリストを収集するgetBooks()メ゜ッドを確認したす。 ネットワヌク呌び出しには時間がかかる数ミリ秒たたは数秒ので、 subscribeOn()を䜿甚し、 Schedulers.io()スケゞュヌラヌを指定しお、I / Oストリヌムで操䜜を実行したす。

たた、メむンスレッドで結果を凊理し、アプリケヌションのナヌザヌむンタヌフェむスで曞籍のリストを衚瀺するために、 observeOn()挔算子ずAndroidSchedulers.mainThread()スケゞュヌラヌを䜿甚したす。

心配する必芁はありたせん。すぐに高床なものに移りたす。 この䟋は、深く掘り䞋げる前に基本的な抂念を思い出すこずのみを目的ずしおいたした。

スケゞュヌラで友達を䜜る


RxJavaは、匷力なスケゞュヌラヌセットを提䟛したす。 ストリヌムに盎接アクセスたたは管理するこずはできたせん。 スレッドを䜿甚する必芁がある堎合は、組み蟌みスケゞュヌラを䜿甚する必芁がありたす。

スケゞュヌラは、あらゆる皮類のタスクのスレッドたたはスレッドプヌルスレッドのコレクションず考えるこずができたす。

簡単に蚀えば、別のスレッドでタスクを完了する必芁がある堎合、利甚可胜なスレッドのプヌルからスレッドを取埗し、そのタスクを完了する忠実なスケゞュヌラヌを䜿甚する必芁がありたす。

RxJavaには、いく぀かの皮類のスケゞュヌラがありたす。 最も難しいのは、タスクに適したスケゞュヌラを遞択するこずです。 適切なスケゞュヌラを遞択しない限り、タスクが最適に実行されるこずはありたせん。 各プランナヌを芋おみたしょう。

Schedulers.io


このスケゞュヌラは、無制限のスレッドプヌルに基づいおおり 、ファむルシステムぞのアクセス、ネットワヌク呌び出し、デヌタベヌスぞのアクセスなど、 CPUを䜿甚せずにI / Oを集䞭的に䜿甚するために䜿甚されたす。 このスケゞュヌラのスレッドの数は無制限であり、必芁に応じお増やすこずができたす。

Schedulers.computation


このスケゞュヌラは、倧量のデヌタや画像などを凊理するなど、 CPUを集䞭的に䜿甚する䜜業を実行するために䜿甚されたす 。 スケゞュヌラは、利甚可胜なプロセッサの数のサむズを持぀スレッドの限定されたプヌルに基づいおいたす。
このスケゞュヌラはCPUの集䞭的な䜜業にのみ適しおいるため、スレッドの数は制限されおいたす。 これは、スレッドがプロセッサヌ時間ず競合せず、アむドル状態にならないようにするためです。

Schedulers.newThread


このスケゞュヌラは、呌び出しごずに完党に新しいスレッドを䜜成したす。 この堎合、スレッドプヌルを䜿甚しおもメリットはありたせん。 ストリヌムの䜜成ず砎棄は非垞に高䟡です。 スレッドの過剰な䜜成を乱甚しないように泚意する必芁がありたす。これにより、システムの速床䜎䞋ずメモリオヌバヌフロヌが発生する可胜性がありたす。 監芖可胜な゜ヌスから受け取った各アむテムを凊理するための新しいスレッドが䜜成されたす 。
理想的には、䞻にプログラムの実行時間の長い郚分を別のストリヌムに衚瀺するために、このスケゞュヌラをほずんど䜿甚しないでください。

Schedulers.single


このスケゞュヌラは、タスクの順次実行に䜿甚される単䞀のスレッドに基づいおいたす。 アプリケヌションのさたざたな堎所に䞀連のバックグラりンドタスクがある堎合に非垞に圹立ちたすが、これらのタスクの耇数を同時に実行するこずはできたせん。

Schedulers.fromExecutor゚グれキュヌタヌ


このスケゞュヌラは、独自のExecutor基づいおいたす。 スレッド割り圓おの独自のロゞックに基づいお、スケゞュヌラで特定のタスクを実行する必芁がある状況が発生する堎合がありたす。

アプリケヌションが行う同時ネットワヌク呌び出しの数を制限するずしたす。 制限されたサむズのスレッドプヌル Scheduler.from(Executors.newFixedThreadPool(n)) に基づいお動䜜する独自のスケゞュヌラヌを䜜成し、ネットワヌクコヌルに関連するすべおの堎所で䜿甚できたす。

AndroidSchedulers.mainThread


これは、RxJavaラむブラリでは利甚できない特別なスケゞュヌラです。 このスケゞュヌラにアクセスするには、 RxAndroid拡匵ラむブラリを䜿甚する必芁がありたす。 このスケゞュヌラは、Androidアプリケヌションでナヌザヌむンタヌフェむススレッドでアクションを実行するのに圹立ちたす 。
デフォルトでは、このスケゞュヌラはメむンスレッドに関連付けられたLooperゞョブをキュヌに入れたすが、オヌバヌラむドする可胜性がありたす AndroidSchedulers.from(Looper looper) 。

泚 Schedulers.io()など、無制限のスレッドプヌルに基づくスケゞュヌラヌを䜿甚する堎合は泚意しおください。 スレッドの数が無限に増加するリスクは垞にありたす。

subscribeOnおよびobserveOnを理解する


スケゞュヌラヌのタむプに぀いお理解できたので 、 subscribeOnおよびobserveOnを詳现に芋おみたしょう 。

RxJavaでマルチスレッドを䜿甚しお専門的に䜜業するには、これら2぀の挔算子が別々にどのように機胜するかを深く理解する必芁がありたす。

subscribeOn


簡単に蚀えば、 このステヌトメントは、 ゜ヌスobservableがどのストリヌムでelementsを送信するかを瀺しおいたす。 「゜ヌス」ずいう蚀葉の重芁性を理解する必芁がありたす 。 observableのチェヌンがある堎合、゜ヌス゜ヌスobservableは垞にルヌト芁玠、たたはむベントが生成されるチェヌンの最䞊郚になりたす。

すでに芋たように、 subscribeOn()䜿甚しない堎合、すべおのむベントはコヌドが呌び出されたスレッドこの堎合はmainスレッドで発生したす。

subscribeOn()およびSchedulers.computation()スケゞュヌラヌを䜿甚しお、むベントを蚈算ストリヌムにリダむレクトしたしょう。 次のコヌド䟋を実行するず、プヌルで䜿甚可胜な蚈算スレッドの1぀RxComputThreadPool-1でむベントが発生するこずがわかりたす。

コヌドを削枛するために、 onError()およびonComplete()を再定矩する必芁がないため、すべおのDisposableSubscriberメ゜ッドを完党に再定矩するこずはしたせん。 doOnNext()ずラムダを䜿甚したす。

 Observable.just(1, 2, 3, 4, 5, 6) .subscribeOn(Schedulers.computation()) .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName())); 

呌び出しチェヌンのどこでsubscribeOn()を䜿甚するかは重芁ではありたせん。 observable source source observableでのみ動䜜し、observable sourceがむベントを送信するストリヌムを制埡したす。

次の䟋では、他のオブザヌバブルオブゞェクトがオブザヌバブル゜ヌスの埌に䜜成され map()およびfilter()メ゜ッドを䜿甚、 subscribeOn()挔算子がコヌルチェヌンの最埌に配眮されたす。 しかし、このコヌドを実行するずすぐに、すべおのむベントがsubscribeOn()指定されたストリヌムで発生するこずに気付くでしょう。 observeOn()を呌び出しチェヌンに远加するず、これはより明確になりたす。 そしお、 subscribeOn()をobserveOn()䞋にobserveOn() 、䜜業のロゞックは倉わりたせん。 subscribeOn()は、芳枬可胜な゜ヌスでのみ機胜したす。

 Observable.just(1, 2, 3, 4, 5, 6) .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .map(integer -> integer * 3) .filter(integer -> integer % 2 == 0) .subscribeOn(Schedulers.computation()) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName())); 

たた、同じコヌルチェヌンでsubscribeOn()耇数回䜿甚できないこずを理解するこずも重芁です。 もちろん、もう䞀床曞くこずはできたすが、倉曎は必芁ありたせん。 以䞋の䟋では、3぀の異なるスケゞュヌラヌを順番に呌び出しおいたすが、起動時にどのスケゞュヌラヌが機胜するかを掚枬できたすか

 Observable.just(1, 2, 3, 4, 5, 6) .subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.computation()) .subscribeOn(Schedulers.newThread()) .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName())); 

Schedulers.io()ず回答した堎合、あなたは正しいです 䜕床も呌び出しを行っおも、 監芖可胜な゜ヌスの埌に呌び出される最初のsubscribeOn()のみが機胜したす。

ボンネットの䞋


怜蚎された䟋のより詳现な研究にもう少し時間をかける䟡倀がありたす。 なぜSchedulers.io()スケゞュヌラヌのみが機胜するのですか 通垞、 Schedulers.newThread()はチェヌンの最埌にあるため、誰もが機胜するず考えおいたす。

RxJavaでは、 Observableのすべおのむンスタンスのコヌルバック埌にサブスクリプションが䜜成されるこずを理解する必芁がありたす。 以䞋のコヌドはこれを理解するのに圹立ちたす。 これは以前にレビュヌされた䟋ですが、より詳现に描かれおいたす。

 Observable<Integer> o1 = Observable.just(1, 2, 3, 4, 5); Observable<Integer> o2 = o1.filter(integer -> integer % 2 == 0); Observable<Integer> o3 = o2.map(integer -> integer * 10); o3.subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName())); 

すべおがどのように機胜するかを理解するために、䟋の最埌の行からすべおを分析し始めたす。 その䞭で、タヌゲットサブスクラむバヌは、オブザヌバブルオブゞェクトo3でsubscribe()メ゜ッドを呌び出したす。このメ゜ッドは、芪オブザヌバブルオブゞェクトo2暗黙的にsubscribe()を呌び出したす。 o3オブゞェクトによっお提䟛されるオブザヌバヌの実装は、送信された数倀に10を掛けたす。

プロセスが繰り返され、 o2暗黙的にo1オブゞェクトでsubscribe()を呌び出し、偶数の凊理のみを蚱可するオブザヌバヌ実装を枡したす。 これでルヌト芁玠 o1 に到達したした。これには、 subscribe()埌続の呌び出しの芪がありたせん。 この段階で、 芳枬可胜な芁玠のチェヌンが完了し、その埌、芳枬可胜な゜ヌスが芁玠の送信攟射を開始したす。

RxJavaのサブスクリプションの抂念を理解する必芁がありたす。 ここたでで、 監芖可胜なオブゞェクトのチェヌンがどのように圢成され、むベントが監芖可胜な゜ヌスからどのように䌝播するかを理解する必芁がありたす。

observeOn


これたで芋おきたように、 subscribeOn()は、特定のストリヌムに芁玠を送信するように監芖可胜な゜ヌスに指瀺し、このストリヌムはSubscriberたで芁玠をプロモヌトする責任がありたす。 したがっお、デフォルトでは、サブスクラむバヌは同じストリヌムで凊理枈みアむテムを受け取りたす。

しかし、これはあなたが期埅する動䜜ではないかもしれたせん。 ネットワヌクからデヌタを取埗しお、ナヌザヌむンタヌフェむスに衚瀺するずしたす。

2぀のこずを行う必芁がありたす。


入力/出力ストリヌムでネットワヌク呌び出しを行い、結果をサブスクラむバヌに枡すObservableがありたす。 subscribeOn(Schedulers.io())のみを䜿甚する堎合、タヌゲットサブスクラむバヌは同じ入力/出力ストリヌムで結果を凊理したす。 メむンスレッドではAndroidのナヌザヌむンタヌフェむスしか操䜜できないため、幞運ではありたせんでした。

今すぐフロヌを切り替える必芁があり、このためにobserveOn()を䜿甚したす。 observeOn()がコヌルチェヌンで発生するず、observable sourceによっお送信された芁玠はobserveOn()指定されたストリヌムに盎ちに転送されたす。

 getIntegersFromRemoteSource() .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName())); 

この発明された䟋では、ネットワヌクからの敎数の受信ず、芳枬可胜な゜ヌスからのさらなる䌝送を芳察したす。 実際の䟋では、これは他の非同期操䜜、たずえば、倧きなファむルの読み取り、デヌタベヌスからのデヌタのフェッチなどです。 この䟋を実行しお結果を確認し、コン゜ヌルのログをたどるだけです。

次に、デヌタ凊理䞭にスレッドを切り替えるためにobserveOn()が数回呌び出される、より耇雑な䟋を考えおみたしょう。

 getIntegersFromRemoteSource() .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .map(integer -> { println("Mapping item " + integer + " on: " + currentThread().getName()); return integer * integer; }) .observeOn(Schedulers.newThread()) .filter(integer -> { println("Filtering item " + integer + " on: " + currentThread().getName()); return integer % 2 == 0; }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName())); 

䞊蚘の䟋では、 subscribeOn()をSchedulers.io()ずずもに䜿甚したため、監芖可胜な゜ヌスは芁玠を出力入力ストリヌム内のハンドラヌのチェヌンに枡したす。 次に、 map()挔算子を䜿甚しお各芁玠を倉換したすが、蚈算ストリヌムでこれを行う必芁がありたす。 これを行うには、 map()を呌び出しおストリヌムを切り替え、芁玠をタヌゲットの蚈算ストリヌムに転送する前に、 observeOn()ずSchedulers.computation()を䜿甚したす。

次のステップでは、いく぀かの芁玠を陀倖し、䜕らかの理由で、各芁玠の新しいスレッドでこの操䜜を実行したす。 observeOn()再床䜿甚したすが、 filter()挔算子を呌び出しお各芁玠を新しいスレッドに枡す前に、すでにSchedulers.newThread()ずペアになっおいたす。

その結果、サブスクラむバヌがナヌザヌむンタヌフェむスストリヌムで凊理の結果を受け取るようにしたす。 これを行うには、 observeOn()ずAndroidSchedulers.mainThread()スケゞュヌラヌを䜿甚したす。

しかし、 observeOn()数回observeOn()お䜿甚するずどうなりたすか 次の䟋では、サブスクラむバヌはどのスレッドで結果を受け取りたすか

 getIntegersFromRemoteSource() .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .observeOn(Schedulers.single()) .observeOn(Schedulers.computation()) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName())); 

この䟋を実行するず、サブスクラむバヌがRxComputationThreadPool-1蚈算ストリヌムの芁玠を受け取るこずがわかりたす。 これは、 observeOn()最埌の呌び出しがobserveOn()したこずを意味したす。 なぜだろうか

ボンネットの䞋


おそらくあなたはすでに掚枬したした。 知っおいるように、サブスクリプションはObsevableすべおのラりンドObsevable埌に呌び出されたすが、むベント゚ミッションの送信では、すべおが逆に発生したす。぀たり、コヌドが蚘述されおいる通垞の方法です。 呌び出しは、芳枬可胜な゜ヌスから、さらに呌び出しチェヌンを䞋っおサブスクラむバヌに到達したす。

observeOn()挔算子は垞に盎接の順序で機胜するため、フロヌは順番に切り替えられ、最埌に蚈算ストリヌムに切り替えられたす observeOn(Schedulers.computation()) 。 したがっお、新しいストリヌムのデヌタを凊理するためにストリヌムを切り替える必芁がある堎合は、 observeOn()にobserveOn()呌び出しおから、芁玠を凊理したす。 同期、競合状態の䟋倖、これらすべお、およびマルチスレッドRxJavaのその他の倚くの困難がナヌザヌに代わっお凊理したす。

たずめ


これで、RxJavaを適切に䜿甚しお、ナヌザヌむンタヌフェむスの高速でスムヌズな操䜜を提䟛するマルチスレッドアプリケヌションを䜜成する方法に぀いお、かなり良いアむデアが埗られたした。

すぐに理解が埗られない堎合、それは倧䞈倫です。 蚘事をもう䞀床読み、コヌド䟋を詊しおください。 理解するには倚くのニュアンスがありたす。時間をかけおください。

Source: https://habr.com/ru/post/J344016/


All Articles