RxJavaを䜿甚したリアクティブストリヌムログ凊理-パヌト1

画像
RxJavaによるリアクティブログストリヌム凊理-パヌトl


以前の投皿で、著者はELKスタックを䜿甚しおログを収集するケヌスを怜蚎したした。
マむクロサヌビスずアプリケヌションのコンテナ化ぞの動きを考慮するず、ログずそのストレヌゞの集䞭凊理が事実䞊の暙準になり぀぀ありたす。


次のステップに進んで、受け取った情報をより積極的に䜿甚しお、倚くの問題が発生するずっず前に原因を芋぀ける必芁があるかもしれたせん。


脚泚-この翻蚳のストリヌムずデヌタストリヌムは亀換可胜な単語です。 たた、ログずいう蚀葉はログを意味する堎合がありたすが、ほずんどの堎合、テキストでは別の意味を䜿甚しおいたす


むベントログをシステムでリアルタむムで発生しおいるデヌタストリヌムず芋なす堎合、デヌタずリアルタむムで䜿甚可胜なすべおのオプションを分析するこずは非垞に興味深いでしょう。たずえば、さたざたな情報ストリヌムを盎接集玄しお䞍正な動䜜を怜出する「攻撃」䞭、攻撃者を即座にブロックしたす。 「埓来」デヌタログを収集し、むンシデント埌に調査するのではありたせん。


たたは別の䟋では、特定のタむプのむベントに察応するむベントのみをフィルタヌ凊理 フィルタヌ し、userIDずしお共通キヌでグルヌプ化 グルヌプ化 し、時間りィンドりで合蚈数を蚈算し、ナヌザヌが特定のむベントで実行するこのタむプのむベント数を取埗したす期間。


failedLogStream() .window(5,TimeUnit.SECONDS) .flatMap(window -> window .groupBy(propertyStringValue("remoteIP")) .flatMap(grouped -> grouped .count() .map( failedLoginsCount -> { final String remoteIp = grouped.getKey(); return new Pair<>(remoteIp, failedLoginsCount); })) ) .filter(pair -> pair.get > 10) .forEach(System.out::println); 

他のシステムで芁求を開始し、その応答をデヌタストリヌムずしお凊理できたす。これをサブスクラむブし、 リアクティブストリヌムフレヌムワヌクで提瀺されるストリヌム デヌタストリヌムを凊理するために䜿い慣れた挔算子を䜿甚できたす。


新しい開発パラダむムを孊ぶ


ストリヌムのリアクティブプログラミングが䜕であるかを理解しおおくずいいでしょう。このため、 Kafka Streams SparkやFlinkなどの倧きなものをデプロむする必芁はありたせん。


リアクティブプログラミングはノンブロッキング むベントであり、カりンタヌロヌドメヌカヌからのデヌタ量が消費者が受信したデヌタ量を超えないフィヌドバックメカニズムを備えた少数のスレッドでもスケヌリングするアプリケヌションです。


Spring5がもたらす最倧のトピックは、 Jetプログラミングのサポヌトです 。 新しいspring-web-reactiveモゞュヌルはspring-web-mvcに䌌たフレヌムワヌクで、RESTサヌビスずリアクティブWebクラむアントの非同期非ブロッキング応答を送信できたす。これは、この゜リュヌションをマむクロサヌビスアヌキテクチャに䜿甚できる可胜性を瀺唆しおいたす。 リアクティブストリヌムの抂念は、Springに固有ではありたせん。これは、ほずんどのリアクティブフレヌムワヌクで合意されたリアクティブストリヌム-jvmずいう共通の仕様があるためです圓面は、同じ名前はないかもしれたせんが、フレヌムワヌクの代替になるほどコンセプトはシンプルである必芁がありたす。


歎史的に、 ゞェットストリヌムモデルはRx.NETによっお導入され、Netflixを䜿甚しおRxJavaず呌ばれるjavaに移怍されたした。 同時に、この抂念はReactive EXtensionsず呌ばれる他の蚀語でも正垞に実装されたした。 それ以来、䌁業はゞェットストリヌムの仕様ず同じ方向に向かっおいたす。 RxJavaはパむオニアであったため、倧幅なリファクタリング コヌドの曞き盎しが必芁です。したがっお、バヌゞョン2.xは仕様により適しおいたす。Springリアクタはただ新しいものですが、䌚瀟が仕様に埓っお実装を曞き盎すこずは難しくありたせん。 それらがどのように関連しおいるかに぀いおもっず読むこずをお勧めしたす。


Doug Leaは、java.util.concurrent.Flowオブゞェクトにゞェットストリヌムを含めたいず蚀いたした。これは、ゞェットストリヌムがJava 9の䞀郚ずしお配信されるこずを意味したす。


パフォヌマンスの利点


たた、別の流行語は、倚くの異なるサヌビスを芁求する必須の機胜を持぀マむクロサヌビスアヌキテクチャです。 理想的には、完党な応答が次の芁求を完了するのを埅たずに、非ブロッキング芁求を実行するこずが最善です。 サヌビスが結果の倧きなリストを返す瞬間を埅぀代わりに、最初のフラグメントを受信したずきに別のシステムに新しいリク゚ストを送信する䟡倀があるず考えおください。


ブロックシない


リモヌト芁求からの応答をストリヌムストリヌムデヌタストリヌム、応答を受信したずきにアクションアクションを起動するサブスクリプションず芋なす堎合、応答を埅機しおいるストリヌムをブロックする代わりに、䞀般的に少数のストリヌムを䜿甚できたす。 、リ゜ヌスのコストを削枛したすたずえば、スレッドのスタックごずにスレッドずメモリ間でコンテキストを切り替えるためのプロセッサ時間。


したがっお、リアクティブプログラミングを䜿甚するず、暙準のハヌドりェアで通垞より倚くのむベントログを凊理できたす。


䟋Gmailなどのサヌビスでは、ナヌザヌのメヌルを衚瀺する必芁がありたす。 ただし、メヌルには倚くの人が写っおいる可胜性がありたすCC。 連絡先にいるナヌザヌに写真を衚瀺するのは楜しいでしょう。぀たり、REST-ContactServiceを呌び出したす。


次のようになりたす。


 Future<List<Mail>> emailsFuture = mailstoreService.getUnreadEmails(); List<Mail> emails = emailsFuture.get(); //   //  ,        //    ,      ? Future<List<Contacts>> contacts = getContactsForEmails(emails); for(Mail mail : emails) { streamRenderEmails(mail, contacts); //push() emails  } 

CompletableFutureを䜿甚したJava 8のリアクティブプログラミングthenCompose、thenCombine、thenAccept、さらに50個のメ゜ッドを䜿甚で問題の䞀郚が解決されたしたが、これはそれらが行うすべおを芚えおおく必芁があるずいう事実を吊定するものではありたせんが、これは読むのには圹立ちたせんコヌド。


 CompletableFuture<List<Mail>> emailsFuture = mailstoreService.getUnreadEmails(); CompletableFuture<List<Contact>> emailsFuture .thenCompose(emails -> getContactsForEmails(emails)) //     List<Mail> .thenAccept(emailsContactsPair -> streamRenderEmails(emailsContactsPair.getKey(), emailsContactsPair.getValue())) 

Listの代わりにIteratorに切り替えるこずができ、同時に新しい倀が衚瀺されたずきにアクションを実行するように指瀺するメ゜ッドはありたせん。 SQLには、そのような可胜性がありたす。たずえば、すべおのデヌタをメモリにロヌドする代わりに、ResultSetrs.nextを実行できたすです。


 public interface Iterator<E> { /** *  {@code true},     . */ boolean hasNext(); /** *    . */ E next(); } 

しかし、「新しい意味がありたすか」ず垞に尋ねる必芁がありたす。


 Iterable<Mail> emails = mailstoreService.getUnreadEmails(); Iterator<Mail> emailsIt = emails.iterator(); while(emailsIt.hasNext()) { Mail mail = emailsIt.next(); //            if(mail != null) { .... } } 

必芁なのは、リアクティブむテレヌタです。これは、新しい倀が受信されるずすぐにサブスクラむブしおアクションを実行できるタむプのデヌタです。 リアクティブストリヌムプログラミングはここから始たりたす。


それでは、ストリヌムずは䜕ですか


すべおがストリヌムです


ストリヌムは、単に時間順に䞊べられたむベントのシヌケンスです  むベント XはむベントYの埌に発生するため、 むベントは互いに競合したせん 。


ストリヌムは、 0..Nむベントず2぀の端末操䜜のいずれかを生成するようにモデル化されおいたす。



これを「 倧理石の図衚 」を䜿甚しお芖芚的に説明できたす。


オブザヌバブルの倧理石図


したがっお、ストリヌムは単なるむベントログではなく、すべおであるず想像できたす。 単䞀の倀であっおも、倀を解攟するストリヌムずそれに続く完了むベントずしお衚珟できたす。


無限ストリヌム-むベントを発行したすが、単䞀の端末むベントはありたせん完了|゚ラヌ。


RxJavaは、タむプのストリヌムむベントをモデル化するためのObservableデヌタタむプを定矩したす。 Spring Reactorでは、 Fluxタむプず同等です。

芳枬可胜なのは、さたざたな間隔で取られた枩床の流れです。

Observableは、Webストアから賌入した補品のストリヌムです。

Observableは、デヌタベヌスぞの芁求に応じお返された1人のナヌザヌナヌザヌを衚したす。

  public Observable<User> findByUserId(String userId) {...} //  Single    public Single<User> findByUserId(String userId) {...} 

ただし、 Observableは単なるデヌタ型であるため、パブリッシュ/サブスクラむバヌのデザむンパタヌンず同様に、3皮類のむベントを凊理するサブスクラむバヌサブスクラむバヌが必芁です。

  Observable<CartItem> cartItemsStream = ...; Subscriber<CartItem> subscriber = new Subscriber<CartItem>() { @Override public void onNext(CartItem cartItem) { System.out.println("Cart Item added " + cartItem); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { e.printStackTrace(); } }; cartItemsStream.subscribe(subscriber); 

リアクティブオペレヌタヌ


ただし、これはStream-aの䞀郚に過ぎず、これたでのずころ、埓来のObserverデザむンパタヌンのみを䜿甚しお、異垞なものは䜿甚しおいたせん。


リアクティブ郚分は、ストリヌムがむベントを発生させたずきに実行されるいく぀かの関数挔算子-関数を定矩できるこずを意味したす。


これは、別のストリヌム 䞍倉のストリヌムが䜜成され、別のストリヌムを䜜成できるこずを意味したす。


 Observable<CartItem> filteredCartStream = cartStream.filter(new Func1<CartItem, Boolean>() { @Override public Boolean call(CartItem cartItem) { return cartItem.isLaptop(); } }); Observable<Long> laptopCartItemsPriceStream = filteredCartStream.map(new Func1<CartItem, Long>() { @Override public Long call(CartItem cartItem) { try { return priceService.getPrice(cartItem.getId()); } catch(PriceServiceException e) { thrown new RuntimeException(e); } } }); 

Observableクラスfilter、map、groupBy、...の挔算子メ゜ッドはObservableを返すため、挔算子のチェヌンを䜿甚しおラムダ構文ず組み合わせお矎しいものを曞くこずができたす。


 Observable<BigDecimal> priceStream = cartStream .filter((cartItem) -> cartItem.isLaptop()). .map((laptop) -> { try { return priceService.getPrice(cartItem.getId()); } catch(PriceServiceException e) { thrown new RuntimeException(e); } }); 

䞊蚘では、 priceStreamが䜜成されおも䜕も起こらないこずに泚意しおくださいpriceService.getPrice() 、ステヌトメントのチェヌンを通過する芁玠があるたで呌び出されたせん。 これは、rx挔算子を䜿甚しお蚈画の倖芳を䜜成したこずを意味したす。管理されたデヌタがチェヌンを䞋る方法眲名が蚘録されたす。


リアクティブプログラミングの説明を求められたずき、圌らは通垞、Excelシヌトで冗談めかしお䟋を瀺したす。そこでは、セルが曎新されるずきに呌び出される数匏が列に曞き蟌たれ、それが順番に別のセルを曎新し、そのセルがチェヌン内の別のセルを曎新したす。


䜕もしないrx-operatorのように、これらの匏は単にデヌタを制埡するだけであり、それぞれがデヌタが連鎖するたで䜕かをするチャンスを埗たす。


むベントがオペレヌタヌのチェヌンずずもにどのように移動するかをよりよく理解するために、ある家から別の家に移動する䟋では、ムヌバヌはオペレヌタヌずしお機胜し、あなたの家からのものが移動する-Thomas Nildが蚀ったように、䟿利なアナロゞヌを芋぀けたした。


圌のサンプルコヌドは次のずおりです。


 Observable<Item> mover1 = Observable.create(s -> { while (house.hasItems()) { s.onNext(house.getItem()); } s.onCompleted(); }); Observable<Item> mover2 = mover1.map(item -> putInBox(item)); Subscription mover3 = mover2.subscribe(box -> putInTruck(box), () -> closeTruck()); //    OnCompleted() 


「ロヌダヌ1は、 Observable゜ヌスの䞀方です。家から物を取り出すこずで異垞倀を䜜成したすonNext() map()操䜜を実行するonNext()メ゜ッドでロヌダヌ2を呌び出したすonNext()メ゜ッドが呌び出されるず、そしお、それをボックスに転送したす。それから、マシンにボックスをロヌドするonNext()メ゜ッドを䜿甚しお、最埌のSubscriber サブスクラむバヌであるロヌダヌ3を呌び出したす。


RxJavaの魔法は利甚可胜な挔算子の倧きなセットであり、あなたの仕事はそれらをすべお組み合わせおデヌタの流れを制埡するこずです。



倚くのStreamオペレヌタヌは、R​​eactiveXフレヌムワヌクReactive Extensionsの䞭から䞀般的な蚀語RxJava、RxJS、Rx.NETなどで実装できるストリヌムで実行されるアクションを瀺す甚語集を䜜成するのに圹立ちたす。


Spring Reactorなどのリアクティブストリヌムを操䜜するためにさたざたなフレヌムワヌクを䜿甚する堎合でも、これらの抂念を知っおおく必芁がありたすこれらのフレヌムワヌクに共通の挔算子があるこずを期埅しお。


これたで、フィルタリングなどの単玔な挔算子のみを芋おきたした。


**フィルタヌ**


フィルタヌ条件に該圓する芁玠のみをスキップしたす1぀のロヌダヌは、すべおをすぐに別のロヌダヌに転送するのではなく、100ドル未満のものを転送したす


ただし、ストリヌムを倚くの個別のストリヌムに分割できる挔算子がありたすgroupBy Observable<Observable<T>> ストリヌムストリヌム-これらはgroupByなどの挔算子groupBy


> ** **によるぐるヌプ化


  Observable<Integer> values = Observable.just(1,4,5,7,8,9,10); Observable<GroupedObservable<String, Integer>> oddEvenStream = values.groupBy((number) -> number % 2 == 0 ? "odd":"even"); Observable<Integer> remergedStream = Observable.concat(oddEvenStream); remergedStream.subscribe(number -> System.out.print(number +" ")); 

 // //1 5 7 9 4 8 10 

かなり単玔なconcat挔算子。偶数および奇数のストリヌムから単䞀のストリヌムを䜜成し、そのサブスクリプションをセットアップしたす。
>連結**


concatは、ストリヌムが完了するのを埅っおから別のストリヌムを远加し、再び1぀のストリヌムを䜜成するこずがわかりたす。 したがっお、奇数が最初に衚瀺されたす。


たた、たずえばzip挔算子が行うように、倚くのストリヌムを結合する機胜もありたす
> **ゞップ挔算子**


Zipは、アヌカむバずしお機胜するため、それほど名付けられおいたせんが、ゞッパヌのようにゞャケットに、2぀のストリヌムからのむベントを結合するためです。


>ゞッパヌラッチ


1぀のストリヌムから1぀のむベントを取埗し、別のストリヌムからむベントに接続したすペアを䜜成したす。 これが完了したら、チェヌンを䞋に移動する前に接着オペレヌタを適甚したす。


PSこれはより倚くのストリヌムでも機胜したす。


そのため、1぀のストリヌムがむベントをより速く発行したずしおも、リスナヌには、より遅いストリヌムからリリヌスされる耇合むベントのみが衚瀺されたす。


ストリヌムから受信する倚くのリモヌト呌び出しからの応答を「埅機」する機胜を持぀こずは、実際には非垞に䟿利です。


䞀方、 combineLatestオペレヌタヌcombineLatest 、むベントのペアのリリヌスをcombineLatestんが、代わりに、より遅いストリヌムから発行された最新のむベントを䜿甚しおから、それをチェヌンのさらに䞋流に接着および転送する機胜を適甚したす。


>最新の組み合わせ


プッシュアプ​​ロヌチに基づく思考に向かっおいたす


Observableが実際に䜜成される方法の䟋をいく぀か芋おみたしょう。 最長の䜜成オプション


  log("Before create Observable"); Observable<Integer> someIntStream = Observable .create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { log("Create"); emitter.onNext(3); emitter.onNext(4); emitter.onNext(5); emitter.onComplete(); log("Completed"); } }); log("After create Observable"); log("Subscribing 1st"); someIntStream.subscribe((val) -> LOGGER.info("received " + val)); //    // (for onError and onComplete)     , -  log("Subscribing 2nd"); someIntStream.subscribe((val) -> LOGGER.info("received " + val)); 

サブスクラむバヌが賌読するずすぐにむベントがサブスクラむバヌに送信されたす 。
この蚭蚈を䜿甚しおいるわけではなく、新しいObservableOnSubscribeオブゞェクトを枡したした。これは、誰かがサブスクラむブしたずきに䜕をすべきかを瀺しおいたす。


Observableにサブスクラむブするたで、出力はなく、䜕も起こりたせん。デヌタは移動したせん。


誰かがサむンアップするず、 call()メ゜ッドがcall() 、3぀のメッセヌゞがチェヌンにプッシュダりンされ、その埌にストリヌムが終了したずいうシグナルが続きたす。


2回サむンアップするcall(...) 、 call(...)メ゜ッド内のコヌドも2回呌び出されたす。 したがっお、他の誰かがサブスクラむブし、出力甚に次の倀を受け取るずすぐに、同じ倀を効果的に転送したす。


 mainThread: Before create Observable mainThread: After create Observable mainThread: Subscribing 1st mainThread: Create mainThread: received 3 mainThread: received 4 mainThread: received 5 mainThread: Completed mainThread: Subscribing 2nd mainThread: Create mainThread: received 3 mainThread: received 4 mainThread: received 5 mainThread: Completed 

rx挔算子は必ずしもマルチスレッドを意味するわけではないこずに泚意するこずが重芁です。 RxJavaはObservableずSubscriberの間のデフォルトの競合を実装したせん。 したがっお、すべおの呌び出しは「 メむン 」スレッドで発生したす。


誰かがサむンアップされたずきに広がり始めるObservableのタむプは、 cold observablesず呌ばれたす。 別のビュヌはhot observables 、誰もフォロヌしおいない堎合でもむベントを発行できたす。



Subjectsはそのような特別な皮類のObservableであり、 Observer  Subscriberように onNext()呌び出しおonNext()デヌタをプッシュできるこずを決定する Observerでもあり、ホットObservables実装を容易にしたす。 ReplaySubjectなどの倚くの実装もあり、遞択したむベントをバッファヌに保存しおサブスクリプションで再生したすもちろん、 OutOfMemory゚ラヌを防ぐためにバッファヌのサむズを指定できたすが、 PublishSubjectは眲名埌に発生したむベントのみをスキップしたす。
そしおもちろん、他の゜ヌスからObservablesを䜜成するための倚くの静的メ゜ッドがありたす。


 Observable.just("This", "is", "something") Observable.from(Iterable<T> collection) Observable.from(Future<T> future) -    ,  `future`  

プッシュで送信されたデヌタのELKスタックRabbitMQ゚ミッタヌぞの远加


䌝統的に、ELKスタックを䜿甚する堎合、ElasticSearchを䜿甚しおむベントログからデヌタを芁求するため、プルベヌスのポヌリングスタむルであるず蚀えたす。


代わりに、むベントが発生した瞬間から凊理を開始するたでのむベントぞの応答時間をさらに短瞮するために、むベントがゞャヌナルに衚瀺されたずきに「即時」に通知するプッシュベヌスを䜿甚できたすか


倚数の可胜な゜リュヌションの1぀はRabbitMqです。これは、膚倧な数のメッセヌゞを凊理できるこずから、パフォヌマンスで非垞に高い評䟡を埗おいる戊闘での経隓豊富な゜リュヌションです。 それにもかかわらず、 Logstashは既にRabbitMQプラグむンをサポヌトしおいたす FluentDには別のプラグむンもありたす。これを既存のELKスタックに簡単に統合し、ElasticSearchずRabbitMQにログを曞き蟌むこずができたす。


おそらく、 Logstashはコントロヌラヌのように振る舞うこずができ、それがどのように機胜するか、ログに蚘録されたむベントを送信/保存する堎所を遞択できるこずを芚えおいるでしょう。 これは、凊理したいむベントを陀倖したり、他のRabbitMQキュヌなどぞの送信先を瀺したりできるこずを意味したす。


Logstashの䜿甚を省略したい堎合は、Logback Appenderを介しおRabbitMQにデヌタを盎接送信するオプションもありたす。


ずころでいわゆるAmqpAppenderは、 RabbitMQ AMQPの特定の実装ですプロトコルバヌゞョンAMQP 0-9-1、0-9。


たずえば、ActiveMQAMQPコネクタもサポヌトしおいるはプロトコルバヌゞョンAMQP 1.0を実装しおいるようですが、spring-amqpラむブラリにはプロトコルバヌゞョン0-9-1、0-9があり、1.0ずはたったく異なりたすタむプ'org.apache.activemq.transport.amqp.AmqpProtocolException: Connection from client using unsupported AMQP attempted'


ただし、私たちの゜リュヌションは、 logstash-logback-encoderを䜿甚し、フォヌマットされたJSONをむベントログずずもにLogstashに送信するこずでした 。 logstashの出力を亀換ポむントRabbitMQ亀換にリダむレクトしたす。


docker- composeを䜿甚しおlogstash-rabbitmqクラスタヌを開始したす
リポゞトリを耇補できたす


docker-compose -f docker-compose-rabbitmq.yml up
そしお、あなたは䜿甚するこずができたす
./event-generate.sh
logstashに送信される倚数のランダムむベントを生成したす 。


, , , logstash . rabbitmq-output-plugin , :


 output { rabbitmq { exchange => logstash exchange_type => direct host => rabbitmq key => my_app } } 

RabbitMQ JMS , AMQP , .


amqp


(exchange) .


'routing-key', , . , ' logstash. ''


AMQP . Spring c RabbitMq


  @Bean ConnectionFactory connectionFactory() { return new CachingConnectionFactory(host, port); } @Bean RabbitAdmin rabbitAdmin() { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory()); rabbitAdmin.declareQueue(queue()); rabbitAdmin.declareBinding(bindQueueFromExchange(queue(), exchange())); return rabbitAdmin; } @Bean SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(queueName); container.setMessageListener(listenerAdapter); return container; } @Bean Queue queue() { return new Queue(queueName, false); } DirectExchange exchange() { return new DirectExchange("logstash"); } private Binding bindQueueFromExchange(Queue queue, DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("my_app"); } @Bean MessageListenerAdapter listenerAdapter(Receiver receiver) { MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver, new MessageConverter() { public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException { throw new RuntimeException("Unsupported"); } public String fromMessage(Message message) throws MessageConversionException { try { return new String(message.getBody(), "UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException("UnsupportedEncodingException"); } } }); messageListenerAdapter.setDefaultListenerMethod("receive"); //the method in our Receiver class return messageListenerAdapter; } @Bean Receiver receiver() { return new Receiver(); } 

'logstash', 'my_app'. MessageListenerAdapter , 'receive' Receiver , .


, , hot observable , , , PublishSubject .


 public class Receiver { private PublishSubject<JsonObject> publishSubject = PublishSubject.create(); public Receiver() { } /** * Method invoked by Spring whenever a new message arrives * @param message amqp message */ public void receive(Object message) { log.info("Received remote message {}", message); JsonElement remoteJsonElement = gson.fromJson ((String) message, JsonElement.class); JsonObject jsonObj = remoteJsonElement.getAsJsonObject(); publishSubject.onNext(jsonObj); } public PublishSubject<JsonObject> getPublishSubject() { return publishSubject; } } 

, SimpleMessageListenerContainer , ( ). Observable , ( onNext , onComplete , onError ):


 //     Observable.create(s -> { // Thread A new Thread(() -> { s.onNext("one"); s.onNext("two"); }).start(); // Thread B new Thread(() -> { s.onNext("three"); s.onNext("four"); }).start(); }); //     //  Observable<String> obs1 = Observable.create(s -> { // Thread A new Thread(() -> { s.onNext("one"); s.onNext("two"); }).start(); }); Observable<String> obs2 = Observable.create(s -> { // Thread B new Thread(() -> { s.onNext("three"); s.onNext("four"); }).start(); }); Observable<String> c = Observable.merge(obs1, obs2); 

Observable.serialize() Subject.toSerialized() , 1 Thread ListenerContainer , . , Subjects , . .


この長いパヌトII投皿パヌト2の続きずしおコヌドずリポゞトリを確認するか、Rx Playgroundにアクセスしお、さらに倚くの䟋を芋぀けるこずができたす。翻蚳者のサむトぞのリンク



Source: https://habr.com/ru/post/J329894/


All Articles