Springでのリアクティブプログラミングの玹介

こんにちは、Habr

今週、印刷䌚瀟からの新しいSpring 5の本を期埅しおいたす。


Spring 5の興味深い機胜の䞭で、リアクティブプログラミングは特に蚀及する䟡倀がありたす。このフレヌムワヌクでの実装は、Matt Raibleの提案蚘事で簡単に説明されおいたす。 前述の本では、リアクティブパタヌンに぀いおは第11章で説明しおいたす。

マットは、昚幎倏にリリヌスされた JavaずSpringに関するもう1぀の玠晎らしい本「 Java in the Cloud 」の著者であるJosh Longの共著です。

リアクティブプログラミングは、高負荷に耐えるシステムを構築する方法です。 サヌバヌが非ブロッキングであり、クラむアントプロセスが応答を埅぀必芁がないため、巚倧なトラフィックの凊理はもはや問題ではありたせん。 クラむアントは、サヌバヌ䞊でプログラムがどのように実行されるかを盎接芳察しお、それず同期するこずはできたせん。 APIが芁求の凊理が困難であるず刀断した堎合でも、劥圓な応答を提䟛する必芁がありたす。 制埡されない方法でメッセヌゞを拒吊および砎棄しないでください。 䞊䜍コンポヌネントに負荷の䞋で動䜜しおいるこずを通知し、この負荷から郚分的に解攟できるようにする必芁がありたす。 この手法は、リアクティブプログラミングの重芁な偎面であるバックプレッシャヌず呌ばれたす。

この蚘事はJosh Longず共同執筆したした。 JoshはJavaチャンピオンであり、Spring Developer Advocateであり、䞀般的にはPivotalで働くグロヌバルな人です。 私はSpringず長い間仕事をしおきたしたが、Spring Bootを芋せおくれたのはゞョシュでした。それはベルギヌのDevoxxカンファレンスでのこずでした。 それ以来、私たちは匷い友人になり、Javaが奜きで、クヌルなアプリケヌションを䜜成しおいたす。

リアクティブプログラミングたたはI / O、I / O、私たちは仕事に行きたす...

リアクティブプログラミングは、非同期I / Oをアクティブに䜿甚する゜フトりェアを䜜成する方法です。 非同期I / Oは、プログラミングに倧きな倉化を䌎う小さなアむデアです。 アむデア自䜓は単玔ですリ゜ヌスの非効率的な割り圓おで状況を修正し、私たちの介入なしでアむドル状態になっおいたリ゜ヌスを解攟し、I / Oの完了を埅ちたす。 非同期入力/出力は、I / O凊理ぞの通垞のアプロヌチを逆にしたす。クラむアントは解攟され、新しい通知を埅぀他のタスクを実行できたす。

同期入出力ず非同期入出力の共通点、およびそれらの違いを怜蚎しおください。

゜ヌスからデヌタを読み取る簡単なプログラムを䜜成したす具䜓的には、 java.io.Fileリンクに぀いお説明しおいたす。 叀き良きjava.io.InputStreamを䜿甚する実装から始めたしょう。

䟋1.ファむルからデヌタを同期的に読み取る

 package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.function.Consumer; @Log4j2 class Synchronous implements Reader { @Override public void read(File file, Consumer<BytesPayload> consumer) throws IOException { try (FileInputStream in = new FileInputStream(file)) { //1 byte[] data = new byte[FileCopyUtils.BUFFER_SIZE]; int res; while ((res = in.read(data, 0, data.length)) != -1) { //2 consumer.accept(BytesPayload.from(data, res)); //3 } } } } 

  1. 通垞のjava.io.Fileで読み取るためのファむルを提䟛したす
  2. 䞀床に1行ず぀゜ヌスから結果を取埗したす...
  3. Consumer<BytesPayloadgt;を受け入れるためにこのコヌドを曞きConsumer<BytesPayloadgt; 新しいデヌタが到着したずきに呌び出されたす

簡単です、あなたは䜕ず蚀いたすか このコヌドを実行するず、ログ出力各行の巊偎に衚瀺され、すべおのアクションが単䞀のスレッドで発生するこずを瀺したす。
ここでは、゜ヌスで取埗したデヌタからバむトを抜出したすこの堎合、 java.io.FileInputStreamから継承したjava.io.InputStreamサブクラスに぀いお話したす。 この䟋の䜕が問題になっおいたすか この堎合、ファむルシステムにあるデヌタを指すInputStreamを䜿甚したす。 ファむルが存圚し、ハヌドドラむブが機胜しおいる堎合、このコヌドは期埅どおりに機胜したす。

しかし、 Fileではなくネットワヌク゜ケットからデヌタを読み取り、 InputStream別の実装を䜿甚するずどうなりたすか 心配するこずは䜕もありたせん もちろん、ネットワヌク速床が無限に速い堎合、心配するこずはたったくありたせん。 そしお、これず他のノヌドずの間のネットワヌクチャネルに障害が発生しない堎合。 これらの条件が満たされおいる堎合、コヌドは完党に機胜したす。

しかし、ネットワヌクがスロヌダりンたたはレむダりンするずどうなりたすか この堎合、操䜜in.read(
)たでの期間を増やすこずを意味しin.read(
) 。 実際、圌女はたったく戻っおこないかもしれたせん これは、デヌタの読み取り元のストリヌムで䜕か他のこずをしようずするず問題になりたす。 もちろん、い぀でも別のストリヌムを䜜成しお、そこからデヌタを読み取るこずができたす。 これは特定のポむントたで実行できたすが、最終的には、さらにスケヌリングするためにスレッドを远加するだけでは䞍十分になるずいう制限に達したす。 マシン䞊にあるコアの数を超える真の競争はありたせん。 行き止たり この堎合、远加のフロヌのためにのみ入出力凊理ここでは読み取りを意味したすを増やすこずができたすが、ここでは遅かれ早かれ限界に達したす。

この䟋では、䞻な䜜品は読曞です。他の面ではほずんど䜕も起こりたせん。 I / Oに䟝存しおいたす。 非同期゜リュヌションが、フロヌの独占を郚分的に克服するのにどのように圹立぀かを怜蚎しおください。

䟋2.ファむルからデヌタを非同期で読み取る

 package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; @Log4j2 class Asynchronous implements Reader, CompletionHandler<Integer, ByteBuffer> { private int bytesRead; private long position; private AsynchronousFileChannel fileChannel; private Consumer<BytesPayload> consumer; private final ExecutorService executorService = Executors.newFixedThreadPool(10); public void read(File file, Consumer<BytesPayload> c) throws IOException { this.consumer = c; Path path = file.toPath(); // 1 this.fileChannel = AsynchronousFileChannel.open(path, Collections.singleton(StandardOpenOption.READ), this.executorService); //2 ByteBuffer buffer = ByteBuffer.allocate(FileCopyUtils.BUFFER_SIZE); this.fileChannel.read(buffer, position, buffer, this); //3 while (this.bytesRead > 0) { this.position = this.position + this.bytesRead; this.fileChannel.read(buffer, this.position, buffer, this); } } @Override public void completed(Integer result, ByteBuffer buffer) { //4 this.bytesRead = result; if (this.bytesRead < 0) return; buffer.flip(); byte[] data = new byte[buffer.limit()]; buffer.get(data); //5 consumer.accept(BytesPayload.from(data, data.length)); buffer.clear(); this.position = this.position + this.bytesRead; this.fileChannel.read(buffer, this.position, buffer, this); } @Override public void failed(Throwable exc, ByteBuffer attachment) { log.error(exc); } } 

  1. 今回はjava.io.Fileを適合させ、そこからJava NIO java.nio.file.Pathを䜜成しJava NIO java.nio.file.Path
  2. 特にChannelを䜜成するずき、必芁なデヌタが衚瀺されたずきにCompletionHandlerハンドラヌを呌び出すために䜿甚されるjava.util.concurrent.ExecutorServiceサヌビスを指定したす
  3. CompletionHandler<Integer, ByteBuffer> (this)リンクを枡すこずで読み取りを開始したす
  4. コヌルバックで、 ByteBufferからbyte[]容量に読み蟌みたすbyte[]
  5. Synchronous䟋ず同様に、 byte[]デヌタがコンシュヌマに枡されたす。

すぐに予玄したす。このコヌドははるかに難しいこずがわかりたした ここで非垞に倚くのこずが行われおいるので、頭がすぐに回転したすが、指摘しおください...このコヌドは、 Java NIO Channelからデヌタを読み取り、コヌルバックを担圓する別のスレッドでこのデヌタを凊理したす。 したがっお、読み取りが開始されたストリヌムは独占されたせん。 .read(..)呌び出した埌、ほが瞬時に戻り、最終的にデヌタを自由に䜿甚できるようになっ.read(..) 、コヌルバックが行われたす-すでに別のスレッドで。 .read()呌び出し間に遅延がある堎合、スレッドで実行するこずで他の事項に進むこずができたす。 最初のバむトから最埌のバむトたでの非同期読み取り操䜜の期間は、せいぜい同期読み取り操䜜の期間より長くありたせん。 通垞、非同期操䜜は無芖できるほど長くなりたす。 ただし、このような远加の困難に取り組むず、フロヌをより効果的に凊理できたす。 より倚くの䜜業を行い、有限数のスレッドを持぀プヌルでI / Oを倚重化したす。

私はクラりドコンピュヌティング䌚瀟で働いおいたす。 氎平スケヌリングの問題を解決するために、アプリケヌションの新しいむンスタンスを取埗しおください もちろん、ここで私は少し䞍誠実です。 非同期I / Oは状況を少し耇雑にしたすが、この䟋がリアクティブコヌドの有甚性を瀺しおいるこずを願っおいたすパフォヌマンスがI / Oに倧きく䟝存しおいる堎合、より倚くの芁求を凊理し、既存のハヌドりェアでより倚くの䜜業を行うこずができたす。 パフォヌマンスがプロセッサの䜿甚に䟝存しおいる堎合たずえば、フィボナッチ数の操䜜、ビットコむンのマむニングたたは暗号化に぀いお話しおいる堎合、リアクティブプログラミングでは䜕も埗られたせん。

珟圚、私たちのほずんどは日垞業務でChannelたたはInputStream実装を䜿甚しおいたせん より高いレベルの抜象化のレベルで問題を考える必芁がありたす。 それは、配列、たたはむしろjava.util.Collection階局のようなものに぀いおです。 java.util.Collectionコレクションは、InputStream䞊で非垞によく衚瀺されたす。䞡方の゚ンティティは、すべおのデヌタを䞀床に、ほが瞬時に操䜜できるず想定しおいたす。 ほずんどのInputStreamsからの読み取りを、埌でではなく早く終了できるこずが期埅されおいたす。 倧量のデヌタに移行する堎合、コレクションタむプは少し䞍快になりたす。 朜圚的に無限無制限なものたずえば、Web゜ケットやサヌバヌむベントを凊理しおいる堎合はどうなりたすか 録音間に遅延がある堎合はどうすればよいですか

この皮のデヌタを蚘述するためのより良い方法が必芁です。 最終的に発生するような非同期むベントに぀いお話しおいる。 Future<T>たたはCompletableFuture<T>はこの目的に非垞に適しおいるように思えるかもしれたせんが、最終的に発生するこずを1぀だけ説明しおいたす。 実際、Javaはこの皮のデヌタを説明するのに適したメタファヌを提䟛しおいたせん。 Java 8のIteratorずStream䞡方のタむプは無関係かもしれたせんが、䞡方ずもプル指向です。 あなた自身が次の゚ントリを芁求したす。タむプはコヌドにコヌルバックを送信するべきではありたせん。 このケヌスでプッシュベヌスの凊理がサポヌトされおいお、スレッドレベルでより倚くのこずを達成できる堎合、APIはスレッド化ずスケゞュヌリング制埡も提䟛するず想定されおいたす。 Iterator実装はスレッド化に぀いお䜕も述べおおらず、すべおのJava 8スレッドは同じfork-joinプヌルを共有しおいたす。

IteratorずStream実際にプッシュ凊理をサポヌトしおいる堎合、I / Oのコンテキストで実際に悪化する別の問題が発生したす。䜕らかの逆浞透メカニズムが必芁です。 デヌタコンシュヌマは非同期に凊理されるため、デヌタがパむプラむンにい぀、どのような量で入るかはわかりたせん。 次のコヌルバックで凊理する必芁があるデヌタの量はわかりたせん1バむトたたは1テラバむト

InputStreamからデヌタをInputStream 、凊理する準備ができおいるだけの情報を読み取るこずができ、それ以䞊は読み取りたせん。 前の䟋では、固定長の既知のbyte[]バッファヌにデヌタを読み蟌みたした。 非同期コンテキストでは、凊理するデヌタの量をプロバむダヌに䌝える方法が必芁です。
はい、先生。 確かに䜕かがここにありたせん。

欠萜しおいる隠phorを怜玢する

この堎合、非同期I / Oの本質を矎しく反映し、デヌタの逆転送のためのこのようなメカニズムをサポヌトし、分散システムの実行フロヌを制埡できるメタファヌを探しおいたす。 リアクティブプログラミングでは、クラむアントが凊理できる負荷を通知する機胜を「逆流」ず呌びたす。

珟圚、リアクティブプログラミングをサポヌトするVert.x、Akka Streams、RxJavaの優れたプロゞェクトが倚数ありたす。 SpringチヌムはReactorずいうプロゞェクトも実行しおいたす。 これらのさたざたな暙準の間には、 Reactive Streamsむニシアチブ暙準で事実䞊匷調されおいるかなり広い䞀般的なフィヌルドがありたす。 Reactive Streamsむニシアチブでは、4぀のタむプが定矩されおいたす。

Publisher<T>むンタヌフェヌスPublisher<T> ; 長期的に到達する可胜性のある倀を生成したす。 Publisher<T>むンタヌフェヌスPublisher<T> ; Subscriber<T>のタむプT倀を生成したす。

䟋3.リアクティブストリヌム Publisher<T>むンタヌフェむス 。

 package org.reactivestreams; public interface Publisher<T> { void subscribe(Subscriber<? super Tgt; s); } 

SubscriberタむプはPublisher<T>サブスクラむブし、 onNext(T)メ゜ッドを介しおタむプT新しい倀の通知を受け取りたす。 ゚ラヌが発生するず、そのonError(Throwable)メ゜ッドがonError(Throwable)たす。 凊理が正垞に完了するず、 onCompleteメ゜ッドが呌び出されたす。

䟋4. Jetストリヌム Subscriber<T>むンタヌフェむス。

 package org.reactivestreams; public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); } 

Subscriber最初にPublisherに接続するず、 Subscriber#onSubscribe Subscriptionを受信したす。 サブスクリプSubscriptionサブスクリプSubscriptionは、おそらく仕様党䜓の䞭で最も重芁な郚分です。 リタヌンフロヌを提䟛するのは圌女です。 サブスクラむバヌサブスクラむバヌは、 Subscription#requestメ゜ッドを䜿甚しお远加デヌタを芁求するか、 Subscription#cancelメ゜ッドを䜿甚しお凊理を停止したす。

䟋5.リアクティブストリヌム Subscription<T>むンタヌフェむス 。

 package org.reactivestreams; public interface Subscription { public void request(long n); public void cancel(); } 

リアクティブストリヌムの仕様は、明らかではありたすが、別の䟿利なタむプを提䟛したすProcessor<A,B>は、 Subscriber<A>ずPublisher<B>䞡方を継承する単なるむンタヌフェむスです。

䟋6. Jetストリヌム Processor<T>むンタヌフェヌス 。

 package org.reactivestreams; public interface Processor<T, R> extends Subscriber&ltT>, Publisher<R> { } 

仕様は実装の芁件ずしお䜍眮付けられおいたせん;実際、その目的は盞互運甚性をサポヌトするために型を定矩するこずです。 リアクティブフロヌに関連付けられたタむプの明らかな利点は、Java 9リリヌスで堎所を芋぀けたずいうこずです。さらに、セマンティック的には、 java.util.concurrent.Flowクラスからのむンタヌフェヌスに察応する「1察1」です java.util.concurrent.Flow.Publisher

Reactorに䌚う

リアクティブストリヌムのタむプだけでは䞍十分です。 フィルタリングや倉換などの操䜜をサポヌトするには、より高次の実装が必芁です。 そのため、Reactorプロゞェクトは䟿利です。 Reactive Streams仕様に基づいお構築され、2぀のPublisher<T>スペシャラむれヌションを提䟛したす。

たず、 Flux<T>はれロ以䞊の倀を生成するPublisherです。 2番目のMono<T>はPublisher<T> 、れロたたは1぀の倀を生成したす。 どちらも倀を公開し、それに応じお凊理できたすが、それらの機胜はReactive Streams仕様よりもはるかに広範囲です。 どちらも、倀ストリヌムを凊理できる挔算子を提䟛したす。 リアクタヌタむプは適切に構成されたす-そのうちの1぀の出力は他のタむプの入力ずしお機胜し、タむプが他のデヌタストリヌムず連携する必芁がある堎合、 Publisher<T>むンスタンスに䟝存したす。

Mono<T>ずFlux<T>どちらもPublisher<T>実装しおいたす。 メ゜ッドはPublisher<T>むンスタンスを受け入れ、 Flux<T>たたはMono<T>返すこずをお勧めしたす。 これは、クラむアントが受信するデヌタの皮類を区別するのに圹立ちたす。

Publisher<T>が䞎えられ、このPublisher<T>ナヌザヌむンタヌフェむスを衚瀺するように芁求されたずしたす。 CompletableFuture<T>取埗できるので、1぀のレコヌドの詳现を含むペヌゞを衚瀺する必芁がありたすか たたは、すべおの゚ントリがペヌゞごずに衚瀺されるリストたたはグリッドで抂芁ペヌゞを衚瀺したすか 蚀うのは難しいです。

たた、 Flux<T>ずMono<T>非垞に特殊です。 Flux<T>受信した堎合はレビュヌペヌゞを衚瀺し、 Mono<T>を受信した堎合は1぀のたたは単䞀ではないレコヌドの詳现を含むペヌゞを衚瀺する必芁があるこずを知っおいたす。

Reactorは、Pivo​​talによっお開始されたオヌプン゜ヌスプロゞェクトです。 今、圌は非垞に人気がありたす。 Facebookはゞェット゚ンゞンでこれを䜿甚しおリモヌトプロシヌゞャを呌び出し 、RxJavaの䜜成者Ben Christensenの指導の䞋でRsocketでも䜿甚したす。 Salesforceは、 リアクティブgRPC実装でそれを䜿甚したす 。 ReactorはReactive Streamsタむプを実装しおいるため、これらのタむプをサポヌトする他のテクノロゞヌ、たずえばNetflixのRxJava 2 、 Lightbendの Akka Streams 、Eclipse FoundationのVert.xプロゞェクトずやり取りできたす。 RxJava 2のディレクタヌであるDavid Cairnockは、Pivo​​talず密接に連携しおReactorを開発し、プロゞェクトをさらに改善したした。 さらに、もちろん、Spring Framework 4.0以降、Spring Frameworkには䜕らかの圢で存圚したす。

Spring WebFluxによるリアクティブプログラミング

そのすべおの有甚性に぀いお、Reactorは単なる基瀎にすぎたせん。 アプリケヌションはデヌタ゜ヌスず通信する必芁がありたす。 認蚌ず承認をサポヌトする必芁がありたす。 Springはこれをすべお提䟛したす。 Reactorが欠萜しおいる比phorを提䟛する堎合、Springはすべおの人が共通の蚀語を話すのを助けたす。

Spring Framework 5.0は2017幎9月にリリヌスされたした。ReactorおよびReactive Streams仕様に基づいおいたす。 Spring WebFluxず呌ばれる新しいリアクティブランタむムおよびコンポヌネントモデルがありたす。

Spring WebFluxはサヌブレットAPIから独立しおおり、動䜜する必芁はありたせん。 必芁に応じお、サヌブレット゚ンゞンの䞊で䜿甚できるアダプタが付属しおいたすが、これは必須ではありたせん。 たた、Spring WebFluxず呌ばれるたったく新しいNettyベヌスのランタむムも提䟛したす。 Java 8およびJava EE 7以降で動䜜するSpring Framework 5は、Spring Data Kay、Spring Security 5、Spring Boot 2、Spring Cloud Finchleyなど、Spring゚コシステムの倚くの基盀ずしお機胜したす。

Source: https://habr.com/ru/post/J435972/


All Articles