RxJava 2 for Androidの探玢

ここに画像の説明を入力しおください


私の名前はArkadyです。私はBadooのAndroid開発者です。 最近のブログには、Go、PHP、JS、QAに関する倚くの投皿があり、モバむル開発に関するトピックでそれらを薄めるこずにしたした。 RxJava 1からRxJava 2に1぀のAndroidプロゞェクトを移怍し、むンタヌネットでこのトピックに蚘茉されおいるすべおのものを読みたした。 特に、GOTOコペンハヌゲン2016カンファレンスのJake Wortonのレポヌトです。これは翻蚳にふさわしい候補であるように思われたした。倚くのAndroid開発者がRxJava 2ぞの切り替えを考えおいるず思いたす。


Jakeはリアクティブプログラミングに぀いおかなり倚くの玹介を行ったため、この蚘事を理解するためにRxJava 1の知識は必芁ありたせん。 レポヌトは、RxJava2がリリヌスの準備ができたずきに準備されたしたバヌゞョン2.1.0はすでにリリヌスされおいたす。



反応する理由


なぜ党員がリアクティブプログラミングに぀いお突然話し始めたのですか アプリケヌションを完党に同期化できない堎合、単䞀の非同期リ゜ヌスがあるず、埓来の呜什型プログラミングスタむルが完党に壊れおしたいたす。 「すべおが機胜しなくなる」ずいう意味ではなく、「耇雑さを増す」ずいう意味での「ブレむク」。その結果、呜什型プログラミングのすべおの利点が倱われ始めたす。


私がこれを深刻な問題ず考える理由を説明するために、䟋を挙げたす。


いく぀かの修食子を䜿甚しおUserオブゞェクトを取埗できる単玔なクラスから始めたしょう。


interface UserManager { User getUser(); void setName(String name); void setAge(int age); } UserManager um = new UserManager(); System.out.println(um.getUser()); um.setName("Jane Doe"); System.out.println(um.getUser()); 

同期のシングルスレッドの䞖界に䜏んでいた堎合、このコヌドは、むンスタンスの䜜成、ナヌザヌ出力、䞀郚のプロパティの倉曎、ナヌザヌ出力など、たさに期埅どおりの動䜜をしたす。


問題は、非同期性に頌り始めるずきに発生したす。 サヌバヌ偎のプロパティの倉曎を反映する必芁があるずしたしょう。 これを行うには、最埌の2぀のメ゜ッドが非同期である必芁がありたす。 その堎合、どのようにコヌドを倉曎したすか


解決策の1぀は、䜕もしないこずです。非同期サヌバヌ曎新の呌び出しが成功するず想定しお、ロヌカルで倉曎を加えるこずができたす。 それらは即座に反映されたす。 ご存知のように、これは良い考えではありたせん。 ネットワヌクは予枬䞍胜であり、サヌバヌぱラヌを返す可胜性があるため、䜕らかの理由でロヌカル状態をロヌルバックする必芁がありたす。


簡単な解決策は、非同期呌び出しが正垞に完了したずきに実行されるRunnableを䜿甚するこずです。 これは事埌察応的な動䜜です。倉曎芁求が成功したこずが確実な堎合にのみ、衚瀺されたデヌタを曎新したす。


 interface UserManager { User getUser(); void setName(String name, Runnable callback); void setAge(int age, Runnable callback); } UserManager um = new UserManager(); System.out.println(um.getUser()); um.setName("Jane Doe", new Runnable() { @Override public void run() { System.out.println(um.getUser()); } }); 

ただし、発生する可胜性のある問題ネットワヌクの問題などは凊理したせん。 ゚ラヌが発生した堎合に䜕かできるように、特別なListener䜜成する䟡倀があるのでしょうか


 UserManager um = new UserManager(); System.out.println(um.getUser()); um.setName("Jane Doe", new UserManager.Listener() { @Override public void success() { System.out.println(um.getUser()); } @Override public void failure(IOException e) { // TODO show the error... } }); 

問題に぀いおナヌザヌに通知できたす。 自動的に再詊行できたす。 同様の゜リュヌションが機胜し、この方向では、非同期コヌドず単䞀スレッドAndroidの堎合、これはUIスレッドで実行されるコヌドを組み合わせる必芁がありたす。


非同期呌び出しを行う必芁があるほど、問題が倚くなりたす。 たずえば、ナヌザヌがフォヌムに入力するず、いく぀かのプロパティが倉曎されたす。 たたは、ある呌び出しが正垞に完了するず別の非同期呌び出しがトリガヌされる堎合に、䞀連の非同期呌び出しがあり、成功たたは倱敗も䌎う堎合がありたす。


 UserManager um = new UserManager(); System.out.println(um.getUser()); um. setName(“Jane Doe”, new UserManager.Listener() { @Override public void success() { System.out.println(um.getUser()); } @Override public void failure(IOException e) { // TODO show the error
 } }); um.setAge(40, new UserManager.Listener() { @Override public void success() { System.out.println(um.getUser()); } @Override public void failure(IOException e) { // TODO show the error
 } }); 

これはすべおAndroidのコンテキストで行われるこずを忘れないでください。 したがっお、他の倚くの芁因を考慮する必芁がありたす。 たずえば、 successコヌルバックで情報をUIに盎接転送しようずするこずができたすが、問題はAndroidのActivityが䞀時的であるこずです。 それらはい぀でも砎壊できたす。 ナヌザヌが着信コヌルを受信したずしたしょう-アプリケヌションはシステムによっお最小化されたす。 たたは、ナヌザヌが[ Homeたたは[ Backクリックした可胜性がありたす。 UIの砎棄埌に非同期呌び出しが返される堎合、問題が発生したす。


 public final class UserActivity extends Activity { private final UserManager um = new UserManager(); @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.user); TextView tv = (TextView) findViewById(R.id.user_name); tv.setText(um.getUser().toString()); um.setName("Jane Doe", new UserManager.Listener() { @Override public void success() { tv.setText(um.getUser().toString()); } @Override public void failure(IOException e) { // TODO show the error... } }); } } 

問題を解決するための必須のアプロヌチがありたす。 UIメ゜ッドを呌び出す前にステヌタスを確認できたす。


 public final class UserActivity extends Activity { private final UserManager um = new UserManager(); @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.user); TextView tv = (TextView) findViewById(R.id.user_name); tv.setText(um.getUser().toString()); um.setName(“Jane Doe”, new UserManager.Listener() { @Override public void success() { if (!isDestroyed()) { tv.setText(um.getUser().toString()); } } @Override public void failure(IOException e) { // TODO show the error
 } }); } } 

この䟋では、非同期呌び出しが完了するたでActivityぞの参照を保持するため、短期的なメモリリヌクを明確に匕き起こす匿名型を䜜成したす。


問題は、これらのコヌルバックがどのスレッドで呌び出されるかわからないずいう事実にもありたす。 バックグラりンドスレッドで呌び出される可胜性があるため、むベントを実行のメむンスレッド main/UI thread に送信する必芁がありたす。


 public final class UserActivity extends Activity { private final UserManager um = new UserManager(); @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.user); TextView tv = (TextView) findViewById(R.id.user_name); tv.setText(um.getUser().toString()); um.setName("Jane Doe", new UserManager.Listener() { @Override public void success() { runOnUiThread(new Runnable() { @Override public void run() { if (!isDestroyed()) { tv.setText(um.getUser().toString()); } } }); } @Override public void failure(IOException e) { // TODO show the error... } }); } } 

コヌドによっお解決されたメむンタスクに関連しないものがたくさんあるActivityが散らかっおいたす。 そしお、これはすべお非同期で䜜業を開始し、非同期の結果を凊理するこずです。 非同期芁求呌び出しを実装したした。 ナヌザヌ入力をブロックせず、ボタンクリックを凊理せず、耇数のフィヌルドで動䜜したせん。


コヌドが単玔なタスクを1぀だけ解決し、それを実際のアプリケヌションに倉え始めたずしおも、すぐに問題が発生し、 Activity状態ずチェックの束を管理する必芁に盎面したす。


リアクティブ思考


実際のアプリケヌションでは、すべおが非同期に動䜜したす。 リク゚ストを送信し、長い時間を経お回答を受け取るネットワヌクがありたす。 メむンの実行スレッドをブロックするこずはできないため、ネットワヌクの操䜜はバックグラりンドスレッドで実行する必芁がありたす。 ファむルシステム、デヌタベヌス、リポゞトリぞの曞き蟌み、さらにはshared preferencesぞの曞き蟌み時にメむンスレッドをブロックするこずはできないため、これらの操䜜をバックグラりンドストリヌムで実行する必芁がありたす。


ナヌザヌも非同期デヌタ゜ヌスのようなものです。 UIを介しお情報を提䟛し、ボタンを抌しおフィヌルドにデヌタを入力するこずで、それに応答したす。


ここに画像の説明を入力しおください


ナヌザヌは、異なる時間にアプリケヌションに戻るこずができたす。 たた、アプリケヌションはデヌタを受信する準備ができおいる必芁があり、実行のメむンスレッドがブロックされおいる状態がないようにリアクティブにする必芁がありたす。 そのため、デヌタの䞀郚が非同期的に到着する状況はありたせんが、アプリケヌションはこれを予期せず、その結果、受信したデヌタを考慮せず、クラッシュさえしたせん。 これが困難です。 Activity / Fragmentでこれらの状態をすべお維持する必芁がありたす。 倚数の非同期゜ヌスが、おそらく異なる速床でデヌタを生成および消費するずいう事実ず調和させる必芁がありたす。 たた、非同期プラットフォヌムであるAndroid自䜓の䜜業に぀いおは考慮しおいたせん。 プッシュ通知、ブロヌドキャスト、構成の倉曎がありたす。 ナヌザヌはい぀でもデバむスをポヌトレヌトからランドスケヌプに、たたはその逆に切り替えるこずができたす。コヌドの準備が敎っおいない堎合、アプリケヌションはクラッシュしたり、正しく動䜜したせん。


アプリケヌションのアヌキテクチャ党䜓の同期を保蚌するこずはできたせんが、単䞀の非同期リ゜ヌスがあるず、埓来の呜什型プログラミングスタむルが厩れたす。


ネットワヌク芁求を䜿甚しないアプリケヌションを芋぀けるこずは難しく、それらは本質的に非同期です。 ディスクがあり、デヌタベヌスは非同期゜ヌスです。 UIは、非同期゜ヌスずしおのみ考慮されるべきです。 そのため、デフォルトでは、Androidのすべおが非同期に機胜したす。 埓来の呜什型プログラミングず状態管理の手法に固執するず、自分自身に害を及がしたす。


ここに画像の説明を入力しおください


すべおの非同期アヌキテクチャ芁玠を調敎しようずする代わりに、それらを盎接接続するこずにより、この責任から解攟されたす。 UIをデヌタベヌスに盎接眲名しお、デヌタの倉曎に応答できるようにするこずができたす。 デヌタベヌス呌び出しずネットワヌク呌び出しを倉曎しお、クリックを取埗しお送信するのではなく、ボタンのクリックに応答するようにするこずができたす。


私たちが受け取ったネットワヌク応答がデヌタを曎新するならば、それは玠晎らしいでしょう。 結局、デヌタが曎新されるず、UIは自動的に曎新されたす。 したがっお、私たちはこれに぀いお責任を負いたせん。 Androidが非同期で䜕かを行う堎合たずえば、画面の切り替えやブロヌドキャスト、むンタヌフェむスに自動的に反映されるか、バックグラりンドタスクを自動的に開始するのは玠晎らしいこずです。


ここに画像の説明を入力しおください


䞀般に、このアプロヌチにより、状態をサポヌトするために必芁な倧量のコヌドを蚘述できなくなりたす。状態を管理する代わりに、コンポヌネントを互いに接続するだけです。


Rxjava


RxJavaに枡したす。 このリアクティブラむブラリは、Java開発の最初のフル機胜の[リアクティブ]ツヌルであったため、Android開発で最も人気がありたした。 RxJava 2は、Androidの開発に重芁な叀いバヌゞョンのJavaのサポヌトを保持しおいたす。


RxJavaは以䞋を提䟛したす。



゜ヌス


デヌタ゜ヌスは、リッスンを開始たたは終了するずきに䜕らかの䜜業を行いたす。 応答の埅機を開始するたで送信されないネットワヌク芁求を送信したす。 たた、完了前にデヌタ゜ヌスのサブスクリプションを解陀するず、理論的にはネットワヌク芁求をキャンセルできたす。


゜ヌスは、同期および非同期の䞡方で動䜜できたす。 たずえば、バックグラりンドスレッドで実行されおいるブロッキングネットワヌクリク゚スト、たたはAndroidを呌び出しおonActivityResultを埅機するような玔粋に非同期なものです。 ゜ヌスは、単䞀のアむテムたたは耇数のアむテムを生成できたす。 ネットワヌク芁求は単䞀の応答を返したす。 ただし、UIが機胜しおいる間は、1぀のボタンにサブスクラむブしおいおも、ボタンクリックのフロヌは無限に続く可胜性がありたす。


他の゜ヌスが空になる堎合がありたす。 これは、芁玠を含たず、䜜業が成功たたは倱敗するデヌタ゜ヌスの抂念です。 明確にするために、デヌタベヌスたたはファむルにデヌタを曞き蟌んでいるず想像しおください。 圌らはあなたにアむテムを返したせん。 蚘録は成功するかどうかのいずれかです。 RxJavaでは、゜ヌスは、いわゆる端末むベントonComplete()/ onError()を䜿甚しお、この「実行たたは倱敗」アプロヌチをモデル化したす。 これは、応答を返すか、䟋倖をスロヌするメ゜ッドに䌌おいたす。


完了しおいない可胜性がありたす。 たずえば、UIが機胜しおいる限り機胜するデヌタ゜ヌスずしお、ボタンの抌䞋をシミュレヌトしたす。 そしお、UIが消えるず、おそらくボタンクリックのこの゜ヌスからサブスクラむブを解陀したすが、その䜜業は完了したせん。


これはすべお、 Observerパタヌンに察応しおいたす。 デヌタを生成できるものがありたす。 このデヌタの衚瀺方法に぀いおは合意がありたす。 そしお、私たちはそれらを芋たいです。 リスナヌを远加し、䜕かが発生したずきに通知を受け取りたす。


流動性vs. 芳枬可胜


RxJava 2では、゜ヌスは2぀の䞻芁なタむプ-FlowableずObservableで衚されたす。 それらは非垞に䌌おいたす。 どちらもれロからn個の芁玠を生成したす。 䞡方ずも成功たたは倱敗する堎合がありたす。 では、なぜ同じデヌタ構造を衚すために2぀の異なるタむプが必芁なのでしょうか


それはすべお背圧のようなものになりたす。 詳现に入るこずなく、バックプレッシャヌがデヌタ゜ヌスの速床を䜎䞋させる可胜性があるずしか蚀えたせん。 既存のシステムのリ゜ヌスは限られおいたす。 そしお、バックプレッシャヌの助けを借りお、デヌタを送信するすべおの人に、デヌタが遅くなるように䌝えるこずができたす。


RxJava 1はバックプレッシャヌをサポヌトしおいたしたが、APIの開発䞭にかなり遅れお远加されたした。 RxJava 1では、システムの各タむプにバックプレッシャヌメカニズムがありたす。 バックプレッシャヌの抂念はすべおのタむプでサポヌトされおいたすが、すべおの゜ヌスがそれを実装しおいるわけではないため、このメカニズムを䜿甚するずアプリケヌションがクラッシュする可胜性がありたす。 背圧アプリケヌションは、事前に蚭蚈および怜蚎する必芁がありたす。 これが、RxJava 2に2皮類の゜ヌスがある理由です。 そのため、バックプレッシャヌをサポヌトするかどうかを゜ヌスタむプで指定できたす。


画面タッチむベントずいうデヌタ゜ヌスがあるずしたす。 遅くするこずはできたせん。 ナヌザヌに「キャラクタヌの半分を描き、凊理䞭に停止しお埅機し、残りを終了したす」ず䌝えるこずはできたせん。 ボタンをオフにする、別のUIを衚瀺するなど、別の方法でデヌタ入力を遅くするこずはできたすが、゜ヌス自䜓を遅くするこずはできたせん。


別の䟋を芋おみたしょう。䞀床に耇数の行を抜出する必芁がある行の倧きなセットを含むデヌタベヌスがありたす。 デヌタベヌスは、 カヌ゜ルなどのツヌルのおかげで、この問題を非垞に効果的に解決できたす。 ただし、タッチむベントのフロヌの堎合、ナヌザヌの指を遅くするこずはできないため、これを実装するこずはできたせん。


RxJava 1では、䞊蚘の䞡方のタむプがObservableずしお実装されおいるため、実行時にバックプレッシャヌを適甚しようずするず、 MissingBackpressureException発生する堎合がありたす。 これが、RxJava 2でさたざたなタむプで゜ヌスが衚瀺される理由になりたした。1぀はバックプレッシャヌをサポヌトし、もう1぀はサポヌトしたせん。 ObservableずFlowable䞡方のタむプは、デヌタをコヌルバックに転送するずいう点で同様に動䜜したす。 これには2぀の察応するむンタヌフェむスがありたす。


Observer 


 interface Observer<T> { void onNext(T t); void onComplete(); void onError(Throwable t); void onSubscribe(Disposable d); } 

そしおSubscriber 


 interface Subscriber<T> { void onNext(T t); void onComplete(); void onError(Throwable t); void onSubscribe(Subscription s); } 

最初のメ゜ッドはonNextず呌ばれ、芁玠はここに配信されたす。 このメ゜ッドは、 ObservableたたはFlowableが芁玠を生成するたびに呌び出され、任意に凊理できるようにしたす。 これは無限に起こり埗たす。 ボタンのクリックを聞くず、クリックするonNextメ゜ッドが呌び出されたす。 無限の゜ヌスには、2぀の端末むベントがありたす。



onCompleteずonErrorは端末むベントです。぀たり、いずれかを受信した埌、゜ヌスからむベントを受信するこずはなくなりたす。
ObserverむンタヌフェヌスずSubscriberむンタヌフェヌスの違いは、最埌のメ゜ッドonSubscribeです。 これは、RxJava 1ず比范した新しい方法です。ObservableたたはFlowableをサブスクラむブする堎合、リ゜ヌスを䜜成したす。リ゜ヌスの操䜜が終了したら、リ゜ヌスをクリヌンアップする必芁がありたす。 onSubscribeは、ObservableたたはonSubscribeリッスンを開始するずすぐに呌び出され、 Disposableの2぀のタむプのオブゞェクトを提䟛したす。


 interface Observer<T> { void onNext(T t); void onComplete(); void onError(Throwable t); void onSubscribe(Disposable d); } interface Disposable { void dispose(); } 

たたはSubscription 


 interface Subscriber<T> { void onNext(T t); void onComplete(); void onError(Throwable t); void onSubscribe(Subscription s); } interface Subscription { void cancel(); void request(long r); } 

Observableに関しおObservable 、 Disposableタむプを䜿甚するず、disposeメ゜ッドを呌び出すこずができたす。぀たり、「このリ゜ヌスでの䜜業が終了したした。デヌタは䞍芁です」ずいう意味です。 ネットワヌク芁求がある堎合は、キャンセルできたす。 ボタン抌䞋の無限のストリヌムを聞いた堎合、これはこれらのむベントを受信したくないこずを意味したす。その堎合、 ViewからOnClickListenerを削陀できView 。


これはすべお、 Subscriptionむンタヌフェむスにも圓おはたりたす。 呌び出し方法は異なりたすが、たったく同じ方法で䜿甚されたすdispose()䌌たcancel()メ゜ッドがありたす。 これは、2番目のrequest(long r)メ゜ッドが存圚するこずでのみ異なりたす。これにより、APIにbackpressureが珟れたす。 このメ゜ッドを䜿甚しお、 Flowableさらに芁玠が必芁であるこずを䌝えたす。


背圧サポヌト付き背圧サポヌトなし
0 – n芁玠、 complete | error complete | error流動性芳枬可胜

したがっお、これら2぀のタむプの唯䞀の違いは、䞀方がバックプレッシャヌをサポヌトし、もう䞀方がサポヌトしないこずです。


ゞェットストリヌム


DisposableタむプずSubscriptionタむプの名前が異なる理由ず、そのメ゜ッドdispose()ずcancel()に぀いお觊れたいず思いたす。 request()メ゜ッドを远加するこずで、䞀方だけを拡匵できなかったのはなぜですか ゞェットストリヌムの仕様がすべおです。 これは、倚数の䌁業が率先しお取り組み、Javaリアクティブラむブラリ甚の暙準むンタヌフェむスセットを開発するこずを決定した結果です。 仕様には4぀のむンタヌフェむスが含たれたす。


 interface Publisher<T> { void subscribe(Subscriber<? super T> s); } interface Subscriber<T> { void onNext(T t); void onComplete(); void onError(Throwable t); void onSubscribe(Subscription s); } interface Subscription { void request(long n); void cancel(); } interface Processor<T, R> extends Subscriber<T>, Publisher<R> { } 

䞊蚘のコヌドでは、 SubscriberずSubscription皮類が衚瀺されたす。 これらは仕様の䞀郚であり、したがっお、これらの名前はRxJava 2で䜿甚されおいたした。これらは暙準の䞀郚であるため、それに察しおできるこずは䜕もありたせん。 しかし、この状況には良い面がありたす。 ストリヌムに2぀の異なるラむブラリを䜿甚する必芁があるずしたしょう。 䜜成者が䞊蚘の暙準を実装しおいる堎合、それらを安党に切り替えるこずができたす。


リアクティブストリヌムバックプレッシャヌサポヌト付き背圧サポヌトなし
0 ... n芁玠、 complete | error complete | error流動性芳枬可胜

Flowableタむプは、背圧のサポヌトを意味するリアクティブフロヌの仕様を実装したす。


UserManager戻りたす。 以前はこのクラスからナヌザヌを抜出し、適切だず思ったずきに衚瀺しおいたした。 これで、Observableを䜿甚できたす。


 interface UserManager { Observable<User> getUser(); void setName(String name); void setAge(int age); } 

Observable<User>は、Userオブゞェクトの゜ヌスです。 すべおの倉曎で芁玠を生成し、画面にデヌタを衚瀺するこずでこれに察応できたす。 これで、システムで発生する他のむベントに基づいお、これに最適な時間を決定する必芁がなくなりたす。


特化した゜ヌス


RxJava 2には、 Observableサブセットである3぀の特殊な゜ヌスがありたす。 最初のものはSingleず呌ばれたす。 単䞀の芁玠を含むか、゚ラヌを生成するため、これは単䞀の芁玠の朜圚的に非同期の゜ヌスほど芁玠のシヌケンスではありたせん。 バックプレッシャヌはサポヌトしおいたせん。 通垞の方法ずしお想像できたす。 メ゜ッドを呌び出しお戻り倀を取埗したす。 メ゜ッドが䟋倖をスロヌしたす。 Single実装するのはこのスキヌムです。 賌読するず、アむテムたたぱラヌが衚瀺されたす。 しかし同時に、 Singleはリアクティブです。


Completable . void-. - , . , , , .


— Maybe . RxJava 1. , , — Optional. backpressure.


RxJava 2 , Single/ Completable/ Maybe , backpressure ( Reactive Streams Specification).


( backpressure)backpressure
0
n , complete | errorFlowableObservable
item | complete | errorMaybe
item | errorSingle
complete | errorCompletable

 interface UserManager { Observable<User> getUser(); void setName(String name); void setAge(int age); } 

setName setAge , , , . Completable .


 interface UserManager { Observable<User> getUser(); Completable setName(String name); Completable setAge(int age); } 


, , , . , .


 Flowable.just("Hello"); Flowable.just("Hello", "World"); Observable.just("Hello"); Observable.just("Hello", "World"); Maybe.just("Hello"); Single.just("Hello"); 

Iterable .


 String[] array = { “Hello”, “World” }; List<String> list = Arrays.asList(array); Flowable.fromArray(array); Flowable.fromIterable(list); Observable.fromArray(array); Observable.fromIterable(list); 

, , , , ( , ).


fromCallable .


 Observable.fromCallable(new Callable<String>() { @Override public String call() { return getName(); } }); 

, . fromCallable Java- Callable , , . , HTTP- -.


 OkHttpClient client = // 
 Request request = // 
 Observable.fromCallable(new Callable<String>() { @Override public String call() throws Exception{ return client.newCall(request).execute(); } }); 

Observable ( ) , , onError . , onNext .


fromCallable :


 Flowable.fromCallable(() -> "Hello"); Observable.fromCallable(() -> "Hello"); Maybe.fromCallable(() -> "Hello"); Single.fromCallable(() -> "Hello"); Completable.fromCallable(() -> "Ignored!"); 

. , .


Maybe Completable . , , – , .


 Maybe.fromAction(()-> System.out.println(“Hello”)); Maybe.fromRunnable(()-> System.out.println(“Hello”)); Completable.fromAction(()-> System.out.println(“Hello”)); Completable.fromRunnable(()-> System.out.println(“Hello”)); 

, , Observable create .


 Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("Hello"); e.onComplete(); } }); 

RxJava 1, – , RxJava 1. RxJava 2, create – , . , subscribe, . ObservableEmitter , . ObservableEmitter . , .


.


 Observable.create(e -> { e.onNext("Hello"); e.onComplete(); }); 

.


 Observable.create(e -> { e.onNext("Hello"); e.onNext("World"); e.onComplete(); }); 

onNext .


— . , HTTP-, onNext HTTP-.


 OkHttpClient client = // 
 Request request = // 
 Observable.create(e -> { Call call = client.newCall(request); call.enqueue(new Callback() { @Override public void onResponse(Response r) throws IOException { e.onNext(r.body().string()); e.onComplete(); } @Override public void onFailure(IOException e) { e.onError(e); } }); }); 

, Observable reate , , . - HTTP-, . HTTP- .


 Observable.create(e -> { Call call = client.newCall(request); e.setCancelation(() -> call.cancel()); call.enqueue(new Callback() { @Override public void onResponse(Response r) throws IOException { e.onNext(r.body().string()); e.onComplete(); }A @Override public void onFailure(IOException e) { e.onError(e); } }); }); 

Android . , Observable , , Listener , .


 View view = // 
 Observable.create(e -> { e.setCancellation(() -> view.setOnClickListener(null)); view.setOnClickListener(v -> e.onNext(v)); }); 

create :


 Flowable.create(e -> { 
 }); Observable.create(e -> { 
 }); Maybe.create(e -> { 
 }); Single.create(e -> { 
 }); Completable.create(e -> { 
 }); 


onSubscribe Observer / Subscriber .


Observer :


 interface Observer<T> { void onNext(T t); void onComplete(); void onError(Throwable t); void onSubscribe(Disposable d); } interface Disposable { void dispose(); } 

Subscriber :


 interface Subscriber<T> { void onNext(T t); void onComplete(); void onError(Throwable t); void onSubscribe(Subscription s); } interface Subscription { void cancel(); void request(long r); } 

, Observer/ Subscriber , subscribe. - onSubscribe – - Disposable / Subscription .


 Observable<String> o = Observable.just(“Hello”); o.subscribe(new Observer<String>() { @Override public void onNext(Sring s) { 
 } @Override public void onComplete() { 
 } @Override public void onError(Throwable t) { 
 } @Override public void onSubscribe(Disposable d) { ??? } }); 

DisposableObserver , , Observable .


 Observable<String> o = Observable.just("Hello"); o.subscribe(new DisposableObserver<String>() { @Override public void onNext(String s) { 
 } @Override public void onComplete() { 
 } @Override public void onError(Throwable t) { 
 } }); 

? .
– DisposableObserver Observer . Disposable , dispose, .


 Observable<String> o = Observable.just(“Hello”); DisposableObserver observer = new DisposableObserver<String>() { @Override public void onNext(Sring s) { 
 } @Override public void onComplete() { 
 } @Override public void onError(Throwable t) { 
 } } o.subscribe(observer); observer.dispose(); 

RxJava 2 subscribeWith , subscribe RxJava 1. Disposable .


 Observable<String> o = Observable.just(“Hello”); Disposable d = new o.subscribeWith(new DisposableObserver<String>() { @Override public void onNext(String s) { 
 } @Override public void onComplete() { 
 } @Override public void onError(Throwable t) { 
 } }); d.dispose(); 

RxJava Disposable : , Disposable , CompositeDisposable .


 Observable<String> o = Observable.just(“Hello”); CompositeDisposable disposables = new CompositeDisposable(); disposables.add(o.subscribeWith(new DisposableObserver<String>() { @Override public void onNext(Sring s) { 
 } @Override public void onComplete() { 
 } @Override public void onError(Throwable t) { 
 } })); disposables.dispose(); 

Android , CompositeDisposable Activity , onDestroy ( -).


subscribeWith backpressure.


 Observable<String> o = Observable.just(“Hello”); Disposable d2 = o.subscribeWith(new DisposableObserver<String>() { 
 }); Maybe<String> m = Maybe.just(“Hello”); Disposable d3 = m.subscribeWith(new DisposableMaybeObserver<String>() { 
 }); Single<String> s = String.just(“Hello”); Disposable d4 = s.subscribeWith(new DisposableSingleObserver<String>() { 
 }); Completable c = Completable.completed(); Disposable d5 = c.subscribeWith(new Disposable Completable Observer<String>() { 
 }); 

Flowable subscribeWith , , Flowable onSubscribe Subscription , Disposable .


 Flowable<String> f = Flowable.just("Hello"); Disposable d1 = f.subscribeWith(new DisposableSubscriber<String>() { 
 }); 

Disposable , Flowable .


, , . , - . , . Disposable Observable, .


オペレヌタヌ


:



.
, , . , toUppercase() .


 String greeting = “Hello”; String yelling = greeting.toUppercase(); 

Observable .


 Observable<String> greeting = Observable.just("Hello"); Observable<String> yelling = greeting.map(s -> s.toUppercase()); 

map . - , .


User : , , . , , .


 Observable<User> user = um.getUser(); Observable<User> mainThreadUser = user.observeOn(AndroidSchedulers.mainThread()); 

: « Observable ». .


observeOn , Observable .


 OkHttpClient client = // 
 Request request = // 
 Observable<Response> response = Observable.fromCallable(() -> { return client.newCall(request).execute(); }); Observable<Response> backgroundResponse = response.subscribeOn(Schedulers.io()); 

. , , . , ( ), . , Schedulers.io() — . , . subscribeOn — , .


, Observable . , . – . . . . . .


 OkHttpClient client = // 
 Request request = // 
 Observable<Response> response = Observable.fromCallable(() -> { return client.newCall(request).execute(); }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .map(response -> response.body().string()); // NetworkOnMainThread! 

map observeOn , Android. HTTP- – , .


 OkHttpClient client = // 
 Request request = // 
 Observable<Response> response = Observable.fromCallable(() -> { return client.newCall(request).execute(); }) .subscribeOn(Schedulers.io()) .map(response -> response.body().string()); // Ok! .observeOn(AndroidSchedulers.mainThread()) 


RxJava , Observable . , first() , . RxJava 1 Observable , . : , get(0) , , , , – . RxJava 2 : first() , , Single .


ここに画像の説明を入力しおください


Observable , , Single , .


ここに画像の説明を入力しおください


firstElement() , Maybe . Observable Maybe .


ここに画像の説明を入力しおください


, Completable . , , ignoreElements .


ここに画像の説明を入力しおください


Flowable : , .


.


ここに画像の説明を入力しおください


«» . , , , «» Single . «» . , Single Observable .



, User : « , UI ». User', , .


 um.getUser() .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<User>() { @Override public void onNext(User user) { tv.setText(user.toString()); } @Override public void onComplete() { /* ignored */ } @Override public void onError(Throwable t) { /* crash or show */ } })); 

, - Disposable. Android, Activity . onDestroy Disposables .


 // onCreate disposables.add(um.getUser() .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<User>() { @Override public void onNext(User user) { tv.setText(user.toString()); } @Override public void onComplete() { /* ignored */ } @Override public void onError(Throwable t) { /* crash or show */ } })); // onDestroy disposables.dispose(); 

, , , . – . .


 disposables.add(um.setName("Jane Doe") .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableCompletableObserver() { @Override public void onComplete() { // success! re-enable editing } @Override public void onError(Throwable t) { // retry or show } })); 

: . Disposable . Disposable .


RxJava 2 , Android: . , Observable , . map, Observable , , .


. RxJava 2 . , . , , – API.


おわりに


RxJava 2 : — , Android, , UI — , , .


RxJava 1, , . RxJava 2.


 class RxJavaInterop { static <T> Flowable<T> toV2Flowable(rx.Observable<T> o) { 
 } static <T> Observable<T> toV2Observable(rx.Observable<T> o) { 
 } static <T> Maybe<T> toV2Maybe(rx.Single<T> s) { 
 } static <T> Maybe<T> toV2Maybe(rx.Completable c) { 
 } static <T> Single<T> toV2Single(rx.Single<T> s) { 
 } static Completable toV2Completable(rx.Completable c) { 
 } static <T> rx.Observable<T> toV1Observable(Publisher<T> p) { 
 } static <T> rx.Observable<T> toV1Observable(Observable<T> o, 
) { 
 } static <T> rx.Single<T> toV1Single(Single<T> o) { 
 } static <T> rx.Single<T> toV1Single(Maybe<T> m) { 
 } static rx.Completable toV1Completable(Completable c) { 
 } static rx.Completable toV1Completable(Maybe<T> m) { 
 } } 

— . , . , - .



Source: https://habr.com/ru/post/J328434/


All Articles