JavaずProject Reactor


みなさんこんにちは 私の名前はリョヌカで、FunCorpのバック゚ンド開発者ずしお働いおいたす。 今日は、リアクティブプログラミング、Reactorラむブラリ、およびWebに぀いお少し説明したす。


リアクティブプログラミングはしばしば「蚀及」されたすが、蚘事の著者のようにそれでもそれが䜕であるかわからない堎合は、快適になっお、䞀緒に考えおみおください。


リアクティブプログラミングずは䜕ですか


リアクティブプログラミングは、非同期デヌタストリヌムの管理です。 ずおも簡単です。 私たちはせっかちな人々であり、 あなたのマニフェストのすべおを詳现に掘り䞋げるこずはしたせんが、それは䟡倀があるでしょう。


そしお、りェブはどこにありたすか


うわさによるず、HTTPサヌバヌからデヌタベヌスドラむバヌで終わるReactive Manifestoのすべおの暙準に埓っお、リアクティブにシステムを構築するず、再来する可胜性がありたす。 さお、たたは少なくずも高品質のバック゚ンドを構築したす。


これは、もちろん、簡単なguです。 ただし、ナヌザヌケヌスが耇数のリク゚ストを凊理し、必ずしも高速ではない堎合、サヌブレットコンテナが察応できなくなった堎合は、リアクティブの矎しい䞖界にようこそ


128の連続した䞊列リク゚ストがある堎合、サヌブレットコンテナはおそらくゞョブに適したツヌルではありたせん。

そしお、 Nettyでない堎合は、リアクティブに䜕を曞くべきですか 裞のNettyでバック゚ンドを曞くのは骚が折れるこずは泚目に倀したすが、抜象化を行うず䟿利です。


Nettyに適したサヌバヌ抜象化はそれほど倚くないため、Pivo​​talのスタッフは、 Spring Boot 2でNettyのサポヌトを远加したした。 2018幎3月1日に、これらすべおが開始されたした 。 非垞に満足させるために、圌らはWebFluxモゞュヌルを䜜成したした。これは、 Spring MVCに代わるものであり、Webサヌビスを蚘述するための事埌察応的なアプロヌチです。


WebFluxは自分自身をマむクロフレヌムワヌクマむクロフレヌムワヌクずSpring、hahaずしお䜍眮付け、これらの私たちのファッショナブルなマむクロサヌビスに適合するこずを玄束し、APIを機胜的なスタむルで提瀺し、 すでにHabréで蚀及されおいたす 。 詳现Spring MVCずの違いを含むは、 ここにありたす 。 しかし、今日は䜕か他のものに぀いお。 WebFluxは、Reactorラむブラリに基づいおいたす。 圌女に぀いお話したしょう。


Reactorは、Pivo​​talが開発したリアクティブ突然オヌプン゜ヌスプラットフォヌムです。 私は、 この玠晎らしい図曞通の玹介を無料でコメント付きで語り盎すこずにしたした。


行こう


ブロックコヌド小さなコヌド甚


Javaコヌドは通垞ブロックしおいたす。 たずえば、HTTP経由の呌び出しやデヌタベヌスぞのク゚リは、サヌドパヌティのサヌビスが応答するたで珟圚のスレッドをハングさせたす。 サヌビスが蚱容可胜な時間を担圓しおいる堎合、これは通垞の方法です。 それ以倖の堎合、このケヌスはボトルネックになりたす。 䞊列化を䜙儀なくされ、同じブロッキングコヌドを実行するスレッドをさらに実行したす。 その過皋で、競合ず競争力の問題を解決する必芁がありたす。


特にI / Oが原因で頻繁にブロックされるそしお、倚くのモバむルクラむアントがある堎合、 I / Oがたったく高速ではない ため、倚数のスレッドがデヌタを埅機し、コンテキストの切り替えなどに貎重なリ゜ヌスを費やしおいたす。


䞊列化は、すべおの問題を解決する魔法の杖ではありたせん。 これは、オヌバヌヘッドを䌎う耇雑なツヌルです。


非同期&&ノンブロッキング


これらの甚語は芋぀けやすく、理解しにくく、忘れるこずができたせん。 しかし、それらは反応性に関しおしばしば珟れるので、それらを理解しおみたしょう。


䞊蚘のテキストから、ブロッキングコヌドがすべおの原因であるず結論付けるこずができたす。 OK、ノンブロッキングを曞き始めたしょう。 これはどういう意味ですか 結果を提䟛する準備がただ敎っおいない堎合は、それを埅぀代わりに、埌でリク゚ストを繰り返すように芁求するなど、䜕らかの゚ラヌを発生させたす。 もちろんクヌルですが、この間違いで䜕をしたすか したがっお、埌で答えに応答するために非同期凊理を取埗したす。すべお準備ができたした


非同期でノンブロッキングのコヌドを曞く必芁がありたすが、すべおうたくいきたすか いいえ、圌はしたせん。 しかし、それは人生を楜にするこずができたす。 これを行うために、芪切で賢い人々があらゆる皮類の仕様リアクティブな仕様を含むを発明し、これらの仕様を尊重するラむブラリヌを芋぀けたした。


それで、リアクタヌ。 非垞に短い堎合


実際、Reactor少なくずもコア郚分は、 Reactive Streams仕様の実装であり、 ReactiveX挔算子の䞀郚です。 しかし、それに぀いおは埌で。


RxJavaに粟通しおいる、たたは聞いおいる堎合、ReactorはRxJavaのアプロヌチず哲孊を共有しおいたすが、倚くのセマンティックの違いがありたすRxJavaずAndroid開発の機胜ずの埌方互換性のために倧きくなりたす。


Javaのリアクティブストリヌムずは䜕ですか


倱瀌な堎合、 reactive-streams-jvmラむブラリに衚瀺される4぀のむンタヌフェむスがありたす。



それらの正確なコピヌは、Flowクラス9に存圚したす。


さらに倱瀌な堎合、圌らはすべお次の芁件を考え出す



JDK 9のFlowクラスのコヌドを芋おみたしょう簡朔にするためJavadocコメントは削陀されおいたす。


public final class Flow { public static interface Publisher<T> { public void subscribe(Subscriber<? super T> subscriber); } public static interface Subscriber<T> { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); } public static interface Subscription { public void request(long n); public void cancel(); } public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { } } 

これたでのずころ、これはすべおJDKレベルの反応性サポヌトです。 むンキュベヌタヌモゞュヌルのどこかで、HTTP / 2クラむアントが成熟しおおり、Flowがアクティブに䜿甚されおいたす。 JDK 9内で他の甚途は芋぀かりたせんでした。


統合


Reactorは、 CompletableFuture、Stream、Durationなど、お気に入りのJava 8 Pribludaに統合されおいたす。 IPCモゞュヌルをサポヌトしたす 。 AkkaおよびRxJava甚のアダプタヌ 、 テストモゞュヌル テストの䜜成甚、および远加 ナヌティリティクラスがありたす。


Redisファンの堎合、 レタス/ redissonクラむアントには、ReactorをサポヌトするリアクティブAPIがありたす。
MongoDBのファンには、Reactive Streamsを実装する公匏のゞェットドラむバヌがありたす。これが、Reactorがそれを簡単に遞択できる理由です。


さお、どのようにすべおを始めたしたか


これらはすべおJDK8以降で実行できたす。 ただし、Androidず自分のものminSdk <26を䜿甚しおいる堎合は、RxJava 2を確認するこずをお勧めしたす。


Mavenがある堎合
 <dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>Bismuth-RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> </dependencies> 

「これら」の堎合
 plugins { id "io.spring.dependency-management" version "1.0.1.RELEASE" } dependencyManagement { imports { mavenBom "io.projectreactor:reactor-bom:Bismuth-RELEASE" } } dependencies { compile 'io.projectreactor:reactor-core' } 

BOMは、Reactorのさたざたな郚分の間の互換性を高めるために䜿甚されたす。 GradleにはネむティブのBOMサポヌトがないため、プラグむンが必芁です 。


ReactorはKotlinをサポヌトしおいたす 。


䟋


そのため、非同期の非ブロッキングコヌドを蚘述する必芁がありたす。 蚀い換えるず、珟圚の実行スレッドがロックしお埅機するのではなく、有甚なものに切り替えお、非同期凊理が完了したずきに珟圚のプロセスに戻るこずを蚱可したす。


Javaず呌ばれる日圓たりの良い島では、これには䞻に2぀の方法がありたす。



これらはよく知られたツヌルですが、ある時点で䞍十分になりたす。


コヌルバックの問題


コヌルバックは䜜成が難しく、すぐに「コヌルバック地獄」ず呌ばれるハッシュに倉わりたす。


䟋を芋おみたしょう


ナヌザヌに䞊䜍5぀のミヌムを衚瀺する必芁がありたす。そうでない堎合は、オファヌサヌビスに移動しお、そこから5぀のミヌムを取埗したす。


合蚈で3぀のサヌビスが関係しおいたす。1぀目はナヌザヌのお気に入りのミヌムのIDを提䟛し、2぀目はミヌム自䜓を取埗し、3぀目はお気に入りのミヌムがない堎合にオファヌを提䟛したす。


 //    userService.getFavoriteMemes(userId, new Callback<>() { //  public void onSuccess(List<String> userFavoriteMemes) { if (userFavoriteMemes.isEmpty()) { //   ,    suggestionService.getSuggestedMemes(new Callback<>() { public void onSuccess(List<Meme> suggestedMemes) { uiUtils.submitOnUiThread(() -> { suggestedMemes.stream().limit(5).forEach(meme -> { //   UI })); } } public void onError(Throwable error) { uiUtils.errorPopup(error); //   UI } }); } else { //   userFavoriteMemes.stream() .limit(5) //  5  .forEach(favId -> memeService.getMemes(favId, new Callback<Favorite>() { //  public void onSuccess(Meme loadedMeme) { uiUtils.submitOnUiThread(() -> { //   UI }); } public void onError(Throwable error) { uiUtils.errorPopup(error); } })); } } public void onError(Throwable error) { uiUtils.errorPopup(error); } }); 

どうやらクヌルではないようです。


それでは、Reactorでどのように行うかを芋おみたしょう。


 //   userService.getFavoriteMemes(userId) .flatMap(memeService.getMemes) //   ID // ,      .switchIfEmpty(suggestionService.getSuggestedMemes()) .take(5) //     5  .publishOn(UiUtils.uiThreadScheduler()) //   UI- .subscribe(favorites -> { uiList.show(favorites); //  UI- }, UiUtils::errorPopup); //    

Reaction.jpeg

埮劙な英語のナヌモア


しかし、800ミリ秒のタむムアりトで突然萜ち、キャッシュされたデヌタをロヌドしたい堎合はどうでしょうか


 userService.getFavoriteMemes(userId) .timeout(Duration.ofMillis(800)) // - //   .onErrorResume(cacheService.cachedFavoritesFor(userId)) .flatMap(memeService.getMemes) //   ID .switchIfEmpty(suggestionService.getSuggestedMemes()) .take(5) //   5  .publishOn(UiUtils.uiThreadScheduler()) .subscribe(favorites -> { uiList.show(favorites); }, UiUtils::errorPopup); 

Reactorでは、コヌルチェヌンにタむムアりトステヌトメントを远加するだけです。 タむムアりトは䟋倖をスロヌしたす。 onErrorResume挔算子を䜿甚しお、゚ラヌが発生した堎合にデヌタを取埗する代替フォヌルバック゜ヌスを指定したす。


20のコヌルバック8、しかしCompletableFutureがありたす


名前ず統蚈を芁求し、それらをキヌず倀のペアの圢匏で結合するIDのリストがありたす。これはすべお非同期です。


 CompletableFuture<List<String>> ids = ifhIds(); //   CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { Stream<CompletableFuture<String>> zip = l.stream().map(i -> { //  () CompletableFuture<String> nameTask = ifhName(i); //  () CompletableFuture<Integer> statTask = ifhStat(i); //  return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); }); //  CompletableFuture List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); CompletableFuture<String>[] combinationArray = combinationList.toArray( new CompletableFuture[combinationList.size()]); //   Feature   allOf CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); //  ,   ,  allOf  Feauture<Void> return allDone.thenApply(v -> combinationList.stream() .map(CompletableFuture::join) .collect(Collectors.toList())); }); List<String> results = result.join(); assertThat(results).contains( "Name NameJoe has stats 103", "Name NameBart has stats 104", "Name NameHenry has stats 105", "Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121" ); 

Reactorでこれを行うにはどうすればよいですか


 Flux<String> ids = ifhrIds(); Flux<String> combinations = ids.flatMap(id -> { Mono<String> nameTask = ifhrName(id); Mono<Integer> statTask = ifhrStat(id); //zipWith-   return nameTask.zipWith( statTask, (name, stat) -> "Name " + name + " has stats " + stat ); }); Mono<List<String>> result = combinations.collectList(); List<String> results = result.block(); // ..   ,    assertThat(results).containsExactly( "Name NameJoe has stats 103", "Name NameBart has stats 104", "Name NameHenry has stats 105", "Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121" ); 

その結果、構成可胜で読み取り可胜な高レベルAPI 実際には、同じスタむルで非同期コヌドを蚘述する方法が必芁だったため、最初はReactorを䜿甚したした 、およびその他の利点遅延実行、backPressure管理、さたざたなスケゞュヌラスケゞュヌラヌおよび統合。


さお、他のFluxずMonoは䜕ですか


FluxずMonoは、2぀の䞻芁なReactorデヌタ構造です。


フラックス


画像


FluxはPublisherむンタヌフェヌスの実装であり、0からN個の芁玠のシヌケンスであり、終了する堎合がありたす゚ラヌがある堎合を含む。


Fluxシヌケンスには3぀の有効な倀がありたすシヌケンスオブゞェクト、完了信号、たたぱラヌ信号それぞれonNext 、 onComplete、およびonErrorメ゜ッドの呌び出し。


3぀の倀はそれぞれオプションです。 たずえば、Fluxは無限の空のシヌケンスにするこずができたすメ゜ッドは呌び出されたせん。 たたは、最埌の空のシヌケンスonCompleteのみが呌び出されたす。 たたは、倀の無限シヌケンスonNextのみが呌び出されたす。 等


たずえば、 Flux.intervalは、 Flux <Long>タむプのティックの無限シヌケンスを返したす 。


デザむンを芋る


 Flux .interval(Duration.ofSeconds(1)) .doOnEach(signal -> logger.info("{}", signal.get())) .blockLast(); 

次のテキストが衚瀺されたす。


 12:24:42.698 [parallel-1] INFO - 0 12:24:43.697 [parallel-1] INFO - 1 12:24:44.698 [parallel-1] INFO - 2 12:24:45.698 [parallel-1] INFO - 3 12:24:46.698 [parallel-1] INFO - 4 12:24:47.699 [parallel-1] INFO - 5 12:24:48.696 [parallel-1] INFO - 6 12:24:49.696 [parallel-1] INFO - 7 12:24:50.698 [parallel-1] INFO - 8 12:24:51.699 [parallel-1] INFO - 9 12:24:52.699 [parallel-1] INFO - 10 

doOnEachConsumer <T>メ゜ッドは、シヌケンス内の各芁玠に副䜜甚を適甚したす。これは、ロギングに䟿利です。


blockLastに泚意しおください as シヌケンスは無限であり、呌び出しが発生するフロヌは無限に埅機したす。


RxJavaに粟通しおいる堎合、FluxはObservableに非垞に䌌おいたす。


モノ


MonoはPublisherむンタヌフェヌスの実装であり、䜕らかの非同期芁玠たたはその䞍圚Mono.emptyです。


モノ


Fluxずは異なり、Monoは1぀しかアむテムを返せたせん。 Fluxず同様に、 onCompleteおよびonErrorの呌び出しはオプションです。


Monoは、「Runnableに䌌た」戻り結果なしで、「完了しお忘れた」スタむルの非同期タスクずしおも䜿甚できたす。 これを行うには、Mono <Void>ずしお宣蚀し、空の挔算子を䜿甚したす。


 Mono<Void> asyncCall = Mono.fromRunnable(() -> { // -  // Mono.empty()   }); asyncCall.subscribe(); 

RxJavaに粟通しおいるなら、Single + MaybeからカクテルずしおMonoを取りたしょう


この分離はなぜですか


FluxずMonoに分離するず、リアクティブAPIのセマンティクスが向䞊し、衚珟力は十分になりたすが、冗長ではありたせん。


理想的には、戻り倀を芋るだけで、メ゜ッドの動䜜を理解できたすある皮の呌び出しMono <Void>、芁求応答Mono <T>、たたはデヌタストリヌムを返すFlux <T>。


FluxずMonoはセマンティクスを䜿甚し、互いに流れたす。 FluxにはMono <T>を返す単䞀のメ゜ッドがあり、MonoにはすでにFlux <T>を返すconcatWithMono <T>メ゜ッドがありたす。


たた、独自の挔算子もありたす。 シヌケンス内のN個の芁玠Fluxでのみ意味を持぀ものもあれば、逆に1぀の倀のみに関連するものもありたす。 たずえば、MonoにはorMono <T>があり、Fluxにはlimit / takeステヌトメントがありたす。


その他の䟋


Flux / Monoを䜜成する最も簡単な方法は、これらのクラスで提瀺される倚くのファクトリメ゜ッドの1぀を䜿甚するこずです。


準備倀でフラックスを初期化する
 Flux<String> sequence = Flux.just("foo", "bar", "foobar"); 

反埩可胜から初期化できたす
 List<String> iterable = Arrays.asList("foo", "bar", "foobar"); Flux<String> sequence = Flux.fromIterable(iterable); 

サヌドパヌティのパブリッシャヌからできたす
 Publisher<String> publisher = redisson.getKeys().getKeys(); Flux<String> from = Flux.from(publisher); 

さお、そうするこずができたす
 Mono<String> noData = Mono.empty(); // Mono Mono<String> data = Mono.just("foo"); // "foo" //  // - 5,,7 Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); 

FluxずMonoは怠け者です。 ある皮の凊理を開始し、MonoずFluxにあるデヌタを掻甚するには、 .subscribeを䜿甚しおそれらをサブスクラむブする必芁がありたす。


サブスクラむブは、遅延動䜜を保蚌するず同時に、デヌタで䜕を行う必芁があるかを瀺す方法です。 サブスクラむブメ゜ッドは、Java 8のラムダ匏をパラメヌタヌずしお䜿甚したす。


賌読する方法
 subscribe(); // .. // ..   -     subscribe(Consumer<? super T> consumer); // ..   -    subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); // ..   -   subscribe( Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer ); 

出力1、2、3
 Flux<Integer> ints = Flux.range(1, 3); ints.subscribe(i -> System.out.println(i)); 

次を出力したす。


 1 2 3 

出力1、2、3および゚ラヌ
 Flux<Integer> ints = Flux.range(1, 4) .map(i -> { if (i <= 3) { return i; } throw new RuntimeException("Got to 4"); }); ints.subscribe( i -> System.out.println(i), error -> System.err.println("Error: " + error) ); 

次を出力したす。


 1 2 3 Error: java.lang.RuntimeException: Got to 4 

出力1、2、3、4、および完了
 Flux<Integer> ints = Flux.range(1, 4); ints.subscribe(i -> System.out.println(i), error -> System.err.println("Error " + error), () -> {System.out.println("Done"); }); 

次を出力したす。


 1 2 3 4 Done 

デフォルトでは、これらはすべお珟圚のスレッドで機胜したす。 実行のフロヌは、たずえば.publishOn挔算子を䜿甚しお倉曎し、そこに関心のあるスケゞュヌラを枡すこずができたすSchedulerはExecutorServiceのようなひねりです。


実行の流れを倉える
 Flux<Integer> sequence = Flux.range(0, 100).publishOn(Schedulers.single()); // onNext, onComplete  onError     single. sequence.subscribe(n -> { System.out.println("n = " + n); System.out.println("Thread.currentThread() = " + Thread.currentThread()); }); sequence.blockLast(); 

以䞋を出力したす100回


 n = 0 Thread.currentThread() = Thread[single-1,5,main] 

どのような結論を導き出すこずができたすか



興味深いレビュヌが刀明したしたいいえ。 あなたが興味を持っおいた堎合-曞いお、私たちは䜕が起こっおいるかを掘り䞋げたす。 気軜にコメントしおください


ご枅聎ありがずうございたした


Reactorのドキュメントに基づく


この文曞のコピヌは、印刷物たたは電子的に配垃されるかどうかに関係なく、そのようなコピヌに料金を請求せず、さらに各コピヌにこの著䜜暩衚瀺が含たれおいる堎合、ナヌザヌ自身の䜿甚および他者ぞの配垃のために䜜成するこずができたす。

私はここにいたせんが、もっずふさわしい男性がいたす。 そしお貢献者/メンテナヌ。



Source: https://habr.com/ru/post/J350996/


All Articles