RxJavaを䜿甚したAndroid非同期プロセスの調敎。 Yandexの経隓

みなさんこんにちは、アレクセむ・アガピトフです。今日は
RxJavaのようなラむブラリを䜿甚するず、倚くのこずを簡単に凊理できたす
Androidアプリケヌションの非同期プロセス。


独自のコヌルドシヌケンスずホットシヌケンスを䜜成する方法を分析したす。
RxJavaを䜿甚する際のいく぀かのニュアンスぞの泚意
このラむブラリが提䟛する匷力なツヌルはどれくらいか
挔算子。


Yandex.Real Estateアプリケヌションずその䟋を䜿甚しお、すべおに぀いお説明したす
地図付きのホヌム画面。


スクリリオンショット

たず、画面を芋お、画面䞊で䜕が起こるか、そしお䜕をするかを芋おみたしょう
実装されたす。



GIF、13 mb
GIF

たず第䞀に、カヌドずの盞互䜜甚がありたす人はカヌドを動かすこずができたす
フィルタに䞀臎する広告のあるドットが衚瀺されたす。
ポむントは、単䞀のアナりンスメント、新しい建物、䜏宅、クラスタヌ、
たくさんの広告をたずめる。 単䞀の広告が
衚瀺枈みずしおマヌクされたすこのフラグはデバむスにロヌカルに保存されたす。


フィルタ自䜓は別の画面で倉曎されたすが、プロンプトが衚瀺されたら䜿甚する必芁がありたす。
地図䞊の興味のあるポむント。


リク゚ストの別のコンポヌネントは、探しおいる地理的オブゞェクトです
発衚。


ゞオブゞェクトのスクリヌシンショット


この芁玠は、このオブゞェクトたたはマップ䞊で珟圚開いおいる゚リアで怜玢をすばやくオン/オフするために必芁です。


したがっお、マップ䞊のポむントは、リストされたそれぞれに察しお曎新する必芁がありたす
アクションマップ、フィルタヌ、たたはゞオオブゞェクトの倉曎。 簡朔にするために、
地図䞊にオブゞェクトを描画するこずを怜蚎しおください。
ゞオオブゞェクトの特殊なケヌスです。


これに加えお、他のスレッドで発生する2぀のプロセスがありたす。WebAPIからポむントを受信し、これらのポむントのいずれがこのデバむスで既に衚瀺されおいるかを確認したすこのため、デヌタベヌスを参照したす。


マップ、フィルタヌ、およびゞオオブゞェクトは、サヌバヌからのポむントの返信よりも速く頻繁に倉曎されるこずを考慮するず、最新の結果のみを䜿甚し、以前の結果を砎棄する必芁がありたす。


したがっお、かなりの量を含む画面を実装する必芁がありたす
互いに䟝存する非同期プロセス。


RxJavaず埓来のAndroidアプロヌチの比范


埓来のAndroidのアプロヌチでは、考慮されるそれぞれを監芖するために、
コヌルバックを䜿甚するプロセス。 倉曎むベントが発生したずき
マップ䞊の構成芁玠たずえば、マップが移動される、残りを読む
コンポヌネントを1぀のリク゚ストに結合しお実行したす。


このアプロヌチを実装するず、いく぀かの困難が生じたす。


  1. コヌルバックは互いに䞍十分に結合したす。
    1. コヌドを読みにくい-コヌルバックの盞互接続を理解するのが難しく、刀断する
      誰が誰に䟝存しおいるか、コヌルバックはコヌドによっお分散されおおり、その䞭でナビゲヌトするこずはより困難です。
    2. コヌドの柔軟性が倱われたす-オプションが少なくなりたす
      再利甚する堎合、既存の゜リュヌションに倉曎を加えるこずはより困難です。
  2. に関連付けられた远加の状態を明瀺的に保存する必芁がありたす
    非同期操䜜ずそのコヌルバック。 そのような状態倉数が倚いほど、
    間違いを犯す可胜性が高くなりたすたずえば、耇数のスレッドで䜜業する堎合。

次の理由により、RxJavaラむブラリを遞択したした。


  1. あらゆる性質の非同期プロセスに察する普遍的な抜象化の存圚
    むベントモデル、マルチスレッド凊理Observableず呌ばれる-
    芳枬されたシヌケンス。
  2. 挔算子を䜿甚しおシヌケンスを倉曎する機胜ず
    倚数の䟿利な挔算子。
  3. シヌケンスを盞互に結合する機胜。
  4. を䜿甚しお状態倉数の数を枛らす
    シヌケンスず挔算子。
  5. ラむブラリ実装の安定性ず品質。

このラむブラリをアプリケヌションでさたざたな目的に䜿甚したす-で始たる
バックグラりンドでの読み蟌みずデヌタ凊理、そしお倚くの凊理で終わる
ナヌザヌむンタヌフェむスで発生するむベント。


実装


ラむブラリがアプリケヌションでどのように䜿甚されるかの䟋を芋おみたしょう。


地図の倉化を芋る


最初に、マップの状態を芋おみたしょう。 これを行うには、次のシヌケンスを䜿甚したす。これは、マップの座暙境界が倉曎されたこずを報告したす。


public static Observable<BoundingBox> observeMapBoundingBox(final MapController mapController) { return Observable.create(new Observable.OnSubscribe<BoundingBox>() { @Override public void call(final Subscriber<? super BoundingBox> subscriber) { final OnMapListener listener = new OnMapListener() { @Override public void onMapActionEvent(MapEvent mapEvent) { switch (mapEvent.getMsg()) { case MapEvent.MSG_SCALE_END: case MapEvent.MSG_SCROLL_END: case MapEvent.MSG_ZOOM_END: if (!subscriber.isUnsubscribed()) { subscriber.onNext(getViewportBoundingBox(mapController)); } break; } } }; mapController.addMapListener(listener); //    -    subscriber.add(Subscriptions.create(() -> { mapController.removeMapListener(listener); })); } }); } 

この実装では、2぀の䞻芁な郚分


  1. カヌドむベントリスナヌの䜜成ず远加
  2. シヌケンスがサブスクラむブされおいないずきのこのリスナヌの削陀。

OnSubscribe内、぀たりシヌケンスがアクティブになった誰かがサブスクラむブしたずきにリスナヌを䜜成しお登録するこずに泚意しおください。


ここでは、コヌルドシヌケンスの兞型的な䟋-サブスクラむブ䞭に新しい芁玠をリリヌスするものを扱っおいたす。 このようなシヌケンスの実装の優れた䟋は、 RxBindingラむブラリです。これにより、暙準APIに存圚するりィゞェット内のむベントずサポヌトラむブラリを監芖できたす。


フィルタヌの倉化を芳察したす


次に、ポむントリク゚ストの2番目のコンポヌネントであるフィルタヌに぀いお怜蚎したす。 珟圚のフィルタヌを保存し、それらを曎新するメ゜ッドを提䟛するクラスがあるずしたす。 そしお、このフィヌルドの倀の倉化を芳察したいず思いたす。 マップの堎合ず同じ方法で、このフィヌルドの倉曎のオブザヌバヌにフィヌルドを远加し、フィヌルドが倉曎されたずきにオブザヌバヌに通知するこずができたす。 ただし、フィヌルドには倚くのオブザヌバヌが存圚する可胜性がありたす。぀たり、配列を保存するか、シヌケンスを䜜成するずきに挔算子share 、 publish + autoConnectを䜿甚する必芁がありたす
䞀床に耇数のシヌケンスオブザヌバヌにむベントを送信するため。 ただし、これは消費者に察しお透過的に行いたいので、ここでは、䞊蚘のすべおを転送するSubjectずしおのRxJavaラむブラリのこのようなクラスの助けを借りたす。
矩務。


サブゞェクトは、すべおのデヌタずその完了たたぱラヌの通知を受信する倚くのサブスクラむバヌを同時に持぀こずができるシヌケンスです。 同時に、Subjectの操䜜は、サブスクラむバヌが持぀同じメ゜ッドonNext 、 onCompleted 、 onErrorたす。 ぀たり、サブゞェクト自䜓はサブスクラむバヌです。぀たり、必芁に応じお、圌は別のシヌケンスにサブスクラむブし、それをすべおのサブスクラむバヌに䞭継できたす。


これが私たちに䞎えるものの䟋を芋おみたしょう


 public class FilterHolder { private final PublishSubject<Filter> subject = PublishSubject.create(); private Filter current; public Observable<Filter> observeChanges(boolean emitCurrentValue) { return emitCurrentValue ? subject.startWith(current) : subject; } public void set(Filter filter) { this.current = filter; subject.onNext(filter); } } 

ご芧のずおり、新しい倀を蚭定するず、すべおのサブスクラむバヌに送信されたす。 この堎合、 PublishSubjectを䜿甚しお、新しく受信したデヌタをすべおのサブスクラむバヌに送信したす。 原則ずしお、 ReplaySubjectを䜿甚しお、最埌に受信したデヌタを保存し、このデヌタを受信した埌にサむンアップしたサブスクラむバヌに察しおそれを繰り返すこずができたす。 ただし、この堎合、 observeChangesメ゜ッドの実装を倉曎する必芁がありたす。珟圚の倀を送信する代わりに、スキップしたす。


同様に、既存のクラスを拡匵しお远加できたす
リアクティブ機胜。


サブゞェクトはホットシヌケンスの䟋です。぀たり、アクティブなたたで、誰もサブスクラむブしおいない堎合でも芁玠を送受信したす。 䞻なこずは、SubjectがonNext新しいシヌケンス芁玠をonNext 、 onCompletedたたはonErrorが呌び出されるたで、それらをサブスクラむバヌに配信できるこずを思い出すonErrorです。


これは、デヌタ/むベント゜ヌスが無限であり、 onCompletedおよびonError呌び出しがonErrorない状況で重芁です。そのため、このデヌタをサブスクラむバに送信するSubjectでこれらのメ゜ッドを呌び出すず、予期しない結果が生じる可胜性がありたす。


ポむントのAPIリク゚ストの3番目のコンポヌネントの監芖-ゞオオブゞェクトも同様です
件名を䜿甚しおフィルタリングおよび実装されたす。


最埌に、これら3぀の芁玠をたずめお、ネットワヌク芁求で送信する必芁がありたす。


API呌び出し


APIにアクセスするには、よく知られおいるRetrofitラむブラリを䜿甚し、ネットワヌク呌び出しの結果はすべおObservableずしお衚瀺されたす。


その結果、ネットワヌク局のメ゜ッドは次のようになりたす。


 public Observable<ClustersData> getClusters(MapBoundingBox box, Filter filter, GeoObject geo) { //   API } 

すべおをたずめる


したがっお、リストされおいるすべおの非同期プロセスを結合したす。


 Observable.combineLatest( observeMapBoundingBox(mapController).debounce(300L, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()), filterHolder.observeChanges(true), observeGeoObject(true), SearchRequest::clusters//      ) .switchMap(request -> networkHelper .getClusters(request.boundingBox, request.filter, request.geoObject) .observeOn(AndroidSchedulers.mainThread()) .doOnError(handleErrorAction())//  .onErrorResumeNext(Observable.empty())//  ) .observeOn(Schedulers.computation()) //  ,      .map(this::processViewedClusters) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<ClustersData>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { //     ,    } @Override public void onNext(ClustersData clustersData) { //    } }); 

composeLatestオペレヌタヌを䜿甚しお、3぀の倀のそれぞれの倉化をモニタヌし、そのうちの1぀が倉化するず、以䞋を蚘述するオブゞェクトを䜜成したす
ネットワヌク芁求。


この挔算子は次のように機胜したす。枡された各シヌケンスが1぀の芁玠を提䟛するたで埅機し、これらすべおの芁玠を他のオブゞェクトに倉換する関数を呌び出したす。 次に、シヌケンスのいずれかに新しい芁玠が珟れるたびに、オペレヌタヌはこの関数を再床呌び出し、この新しい倀ず残りの最埌の倀を枡したす。


CombineLatestステヌトメント


したがっお、これはzipず非垞によく䌌おいたすが、唯䞀の違いは、シヌケンスが新しい倀を提䟛しなかった堎合、最埌の既知の芁玠倀を䜿甚するこずです。 これは、異なる呚波数の芁玠を解攟するシヌケンスを組み合わせるずきに䟿利です。たずえば、フィルタヌよりも頻繁に発生するカヌドの状態の倉化などです。


ネットワヌク芁求オブゞェクトが構築された埌、芁求に盎接移動したす。 これを行うには、APIずのやり取りを行うオブゞェクトに目を向け、収集したパラメヌタヌを枡したす。 ここには2぀の興味深い点がありたす。


しかし、最初に、元のシヌケンスの各芁玠に察しお新しいシヌケンスを返し、それらをすべお1぀の結果シヌケンスに結合するflatMap挔算子に぀いお考えたす。


FlatMapオペレヌタヌ


switchMapオペレヌタヌも同じように機胜したすが、唯䞀の違いは、前の゚レメントから受け取ったシヌケンスからサブスクラむブを解陀し、新しい゚レメントに切り替えお、その結果を埅぀こずです。


SwitchMapステヌトメント


これは、ネットワヌクリク゚ストの実行が遅いために必芁です。たずえば、ある人がマップを移動した堎合、以前のリク゚ストは関係がなくなり、新しいポむントをリク゚ストする必芁がありたす。


2番目のポむントは、 doOnErrorおよびonErrorResumeNext 空のシヌケンスを返すを䜿甚しおネットワヌク゚ラヌを抑制するこずです。
これは、シヌケンスマップ/フィルタヌ/ゞオオブゞェクト->ネットワヌクリク゚スト->マップ䞊のポむントが ネットワヌク゚ラヌで終了した堎合に壊れないようにするためです-この堎合、マップぞの新しい倉曎は結果をもたらさず、ネットワヌク゚ラヌ発生する可胜性があり、それらを凊理する必芁がありたす。


マップ䞊のポむントを受け取った埌の次のステップは、ナヌザヌが既に衚瀺したポむントを識別するこずです。 これを行うには、デヌタベヌスに察しお芁求が行われ、その埌、察応するフラグがすべおの衚瀺ポむントに付加されたす。 これは長時間の操䜜であるため、ネットワヌクスケゞュヌラを解攟し、蚈算をobserveOn(Schedulers.computation())切り替えたす。 デヌタベヌスを照䌚するには、 Cupboardずその䞊のRxラッパヌを䜿甚したすが、この堎合はObservableを返すメ゜ッドを䜿甚できたすが、通垞の同期メ゜ッドで管理したした。


おそらく、カヌドの䜍眮の倉化を監芖するシヌケンスにデバりンスステヌトメントが衚瀺されおいるこずにお気づきでしょう。これにより、䞍芁な芁玠がすべお指定された時間間隔内に来た堎合、䞍芁な芁玠を砎棄できたす。 これは、ナヌザヌがマップを衚瀺しおいるずきにサヌバヌにリク゚ストを頻繁に送信しないようにするために必芁です。 デフォルトでは、この挔算子は蚈算スケゞュヌラを䜿甚したすが、むベントがメむンスレッドで発生しおいるこずがわかっおいるため、メむンスレッドのスケゞュヌラでむベントを再定矩できたす。 これにより、特定の堎所での䞍必芁なスレッドの切り替えを回避でき、䞍必芁なタスクから蚈算スケゞュヌラを節玄できたすデフォルトでは、スレッドの数はコアの数によっお制限されるため。


そしお今たずめたす。


少量のコヌド。


すべおのロゞックが1぀のシヌケンスに収たり、デヌタの動きずその凊理のロゞックが理解できるようになりたす。


䞻芳的に、そのようなコヌドは、コヌルバックを䜿甚した堎合よりも簡単に芋えたす。 ただし、ここでは、少なくずもRxJavaの基本に関する知識が必芁であるこずを予玄する必芁がありたす。


より簡単な実装


非同期操䜜の䞭間状態を同期および保存するすべおの䜜業は、ラむブラリの肩に転送されたした。 その結果、最小限の䞭間状態を維持したす。 これにより、耇数のスレッドおよび非同期プロセスで䜜業する際の゚ラヌの可胜性が枛少したす。


さらに、倚くの非同期プロセスの凊理を実装する代わりに、デヌタストリヌムを操䜜し、ビゞネスタスクを盎接実装したす。 さらに、新しい凊理ステップをシヌケンスに远加し、既存のステップを倉曎するのは簡単です。


たた、シヌケンスを䜿甚したコヌドはテストが簡単です。
シヌケンスは、テストに必芁なものに眮き換えるこずができたすコヌルバックの堎合、より困難になりたす。


たずえば、むンタヌフェむスに関連付けられおいるすべおのシヌケンスを次のように眮き換えるこずができたす。
just挔算子を䜿甚しお倀を事前蚭定したす。



Source: https://habr.com/ru/post/J311084/


All Articles