RxJavaの概要:シーケンスの作成

画像

Rxの基本原則を理解したので、シーケンスを作成および管理する方法を学習します。 シーケンス管理スタイルは、関数型プログラミングに触発された元のC# LINQから借用されました。 すべての操作をトピックで分割します。トピックは操作の複雑さの順に並べられています。 ほとんどのRxオペレーターは既存のシーケンスを管理しますが、最初にそれらを作成する方法を学びます。


内容
内容:


パート2-シーケンスの基本


シーケンスを作成する


以前はSubjectを使用し、値を手動で渡してシーケンスを作成しました。 これは、基本的なRx subscribeメソッドなど、いくつかの重要なポイントを示すために行いました。 ほとんどの場合、 Subjectは新しいObservableを作成する最良の方法ではありません。 このセクションでは、これを行うよりエレガントな方法を見ていきます。


シンプルなファクトリーメソッド


観察可能


Observable作成するjust 、所定数の値が返されて完了します。


 Observable<String> values = Observable.just("one", "two", "three"); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); 

おわりに


 Received: one Received: two Received: three Completed 

Observable.empty


このObservableonCompletedイベントのみをonCompleted 、それ以外は何もonCompletedしません。


 Observable<String> values = Observable.empty(); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); 

おわりに


 Completed 

Observable.never


このObservableは何も発行しません。


 Observable<String> values = Observable.never(); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); 

上記のコードは何も出力しません。 しかし、これはプログラムがブロックされているという意味ではありません。 実際、それは即座に終了します。


Observable.error


このObservableはonErrorイベントをスローして終了します。


 Observable<String> values = Observable.error(new Exception("Oops")); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); 

おわりに


 Error: java.lang.Exception: Oops 

Observable.defer


deferは新しいObservable作成しませんが、サブスクライバーが表示されたときにObservableを作成する方法を決定できます。 現在の時刻を表示するObservableを作成する方法を考えてください。 値が1つしかないため、ここで役立つことがあるようです。


 Observable<Long> now = Observable.just(System.currentTimeMillis()); now.subscribe(System.out::println); Thread.sleep(1000); now.subscribe(System.out::println); 

おわりに


 1431443908375 1431443908375 

1秒後にサインアップする2番目のサブスクライバーが同じ時間を受信したことに注意してください。 これは、時間値が一度だけ計算されたためです:実行がjustメソッドに達したとき。 ただし、この場合、各サブスクリプションの現在時刻を計算します。 deferObservableを返す関数を受け入れ、新しいサブスクライバーごとに実行されます。


 Observable<Long> now = Observable.defer(() -> Observable.just(System.currentTimeMillis())); now.subscribe(System.out::println); Thread.sleep(1000); now.subscribe(System.out::println); 

おわりに


 1431444107854 1431444108858 

Observable.create


createObservableを作成するための非常に強力な方法です。


 static <T> Observable<T> create(Observable.OnSubscribe<T> f) 

すべてが見た目よりもはるかに単純です。 内部は、タイプT Subscriberを受け入れる単なる関数ですT その中で、サブスクライバーに発行されるイベントを手動で決定できます。


 Observable<String> values = Observable.create(o -> { o.onNext("Hello"); o.onCompleted(); }); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); 

おわりに


 Received: Hello Completed 

誰かがObservable (この場合はvalues )をサブスクライブすると、 Subscriberの対応するインスタンスがcreate関数に渡されます。 コードが実行されると、値がサブスクライバーに渡されます。 シーケンスの終了を通知するには、自分でonCompletedメソッドを呼び出す必要があることに注意してください。


このメソッドは、他のメソッドがどれも機能しない場合にObservableを作成するための推奨される方法です。 これはSubjectを作成して手動で値を渡す方法と似ていますが、いくつかの重要な違いがあります。 まず、イベントソースはきちんとカプセル化され、他のコードから分離されます。 第二に、 Subject明らかな危険があります。オブジェクトにアクセスできる人は誰でもシーケンスを変更できます。 後でこの問題に戻ります。


Subjectを使用することとのもう1つの重要な違いは、新しいサブスクライバーが到着したときにのみコードが「怠lazに」実行されることです。 上記の例では、コードはObservableされた時点では実行されません (まだサブスクライバーがないため)が、 subscribeメソッドが呼び出された時点では実行されます。 これは、 ReplaySubjectように、各サブスクライバーの値が再計算されることを意味します。 最終結果は、キャッシュを除いてReplaySubjectに似ています。 create実行を別のスレッドに簡単に転送することもできますが、 ReplaySubject 、値を計算するために手動でスレッドを作成する必要があります。 また、 onSubscribeメソッドを並行してonSubscribeする方法も検討します。


以前のObservableいずれもObservableを使用して実装できることにお気づきかもしれません。 create例はObservable.just("hello")と同等です。


機能的方法


関数型プログラミングでは、通常は無限のシーケンスを作成します。


Observable.range


機能プログラマー向けのシンプルで馴染みのある方法。 指定された範囲から値を返します。


 Observable<Integer> values = Observable.range(10, 15); 

この例では、10から24までの値を順番に返します。


Observable.interval


この関数は、指定された時間間隔で区切られた値の無限シーケンスを作成します。


 Observable<Long> values = Observable.interval(1000, TimeUnit.MILLISECONDS); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); System.in.read(); 

おわりに


 Received: 0 Received: 1 Received: 2 Received: 3 ... 

配信を停止するまで、シーケンスは終了しません。


例の最後で入力をブロックする必要がある理由に注意する必要があります。 これがないと、プログラムは何も印刷せずに終了します。 これは、すべての操作が非ブロッキングであるためですObservable定期的に作成し、これらの値が到着したときに何らかのアクションを実行するサブスクライバーを登録します。 このいずれも、メインスレッドの終了をブロックしません。


Observable.timer


Observable.timerは2つのオーバーロードがあります。 最初のオプションは、一定期間後に0Lを発行するObservableを作成します。


 Observable<Long> values = Observable.timer(1, TimeUnit.SECONDS); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); 

おわりに


 Received: 0 Completed 

2番目のオプションは、事前に決められた期間を想定してから、指定された頻度のintervalと同じ方法で値の生成を開始します。


 Observable<Long> values = Observable.timer(2, 1, TimeUnit.SECONDS); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); 

おわりに


 Received: 0 Received: 1 Received: 2 ... 

上記の例は2秒待機してから、1秒ごとにカウントを開始します。


観測可能になる


Javaには、シーケンス、コレクション、および非同期イベントを操作するためのツールがありますが、これらはRxと直接互換性がない場合があります。 次に、これらをRxコードの入力データに変換する方法を見ていきます。


EventHandlerを使用する場合、 Observable.createを使用して、イベントのシーケンスを作成できます。


 Observable<ActionEvent> events = Observable.create(o -> { button2.setOnAction(new EventHandler<ActionEvent>() { @Override public void handle(ActionEvent e) { o.onNext(e) } }); }) 

特定のイベントに応じて、そのタイプ(この場合はActionEvent )自体がObservableタイプになるのに十分な情報を運ぶことができます。 ただし、非常に多くの場合、たとえばイベント発生時の特定のフィールドの値など、何か他のものが必要になる場合があります。 UIスレッドがブロックされ、フィールド値が関連している間に、ハンドラー内でそのようなフィールドの値を取得することが最善です。 そして、最終加入者に到達するまで値が変更されないという保証はありませんが、正しく実装されたRxコードでは、変更は消費者側で制御されます[1]。


Observable.from


createを使用して、任意の入力をObservable変換できcreate 。 ただし、一般的なデータ型の場合、このプロセスを容易にするために設計された既製のメソッドがあります。


FutureはJavaの一部であり、マルチスレッドフレームワークで作業しているときに遭遇したに違いありません。 1つの値しか返さないため、Rxよりも強力なマルチスレッドツールではありません。 通常、それらをObservableに変換します。


 FutureTask<Integer> f = new FutureTask<Integer>(() -> { Thread.sleep(2000); return 21; }); new Thread(f).start(); Observable<Integer> values = Observable.from(f); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); 

おわりに


 Received: 21 Completed 

Observable FutureTask準備完了の結果をFutureTask完了します。 タスクがキャンセルされた場合、observableはjava.util.concurrent.CancellationExceptionエラーをスローします。


限られた時間だけFutureの結果に興味がある場合、引数としてタイムアウトを設定することができます。


 Observable<Integer> values = Observable.from(f, 1000, TimeUnit.MILLISECONDS); 

この間にFutureが完了しない場合、observableは結果を無視しTimeoutExceptionTimeoutException


Observable.fromを使用すると、コレクションをシーケンスに変換できます。 Observableが作成され、コレクションの各要素が個別にonCompletedれ、最後にonCompletedが発行されます。


 Integer[] is = {1,2,3}; Observable<Integer> values = Observable.from(is); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); 

おわりに


 Received: 1 Received: 2 Received: 3 Completed 

ObservableIterableStreamと同じではありません。 Observableプッシュ指向onNextを呼び出すと、最後のsubscribeメソッドまで実行するハンドラーのスタックが呼び出されるという意味で。 残りのモデルはプル指向です-一方、モデル内の値は要求され、結果が返されるまで実行はブロックされます。


[1] 消費者Observableによって与えられた価値を吸収する消費者


現在、プロジェクトには独自のパブリックリポジトリがあり、誰でもRxの詳細なロシア語チュートリアルの作成に参加できます この部分の翻訳すでにそこにあり、残りはすぐに表示され、あなたの助けを借りて、さらに速くなります。



Source: https://habr.com/ru/post/J281633/


All Articles