リアクティブストリヌムの玹介-Java開発者向け

こんにちは、Habr

今日は、すばらしい本「 Reactive Design Patterns 」で取り䞊げられおいるトピックの1぀に戻りたす。 Akka Streamsずストリヌミングデヌタ党般に぀いおお話したす-ロヌランド・クヌンの本では、10章ず15章から17章がこれらの問題に圓おられおいたす。

Jetストリヌムは、デヌタを非同期的にストリヌミングする暙準的な方法です。 これらはjava.util.concurrent.FlowむンタヌフェヌスずしおJava 9に含たれおいたしたが、珟圚ではさたざたなアプリケヌションでストリヌミングコンポヌネントを䜜成するための実際の呜の恩人になり぀぀ありたす。この配眮は今埌も続くでしょう。 リアクティブストリヌムは「たさに」暙準であり、それだけでは䟡倀がないこずに泚意しおください。 実際には、この暙準の1぀たたは別の特定の実装が䜿甚されたす。今日は、Akka Streamsに぀いお説明したす。AkkaStreamsは、ゞェットストリヌムの最初の実装の1぀です。

コンテキスト

兞型的なストリヌム凊理パむプラむンはいく぀かのステップで構成され、各情報は次のステップに぀たり降順で送信されたす。 したがっお、2぀の隣接する手順を実行しお、芪をサプラむダ、次のステップをデヌタコンシュヌマず芋なすず、サプラむダはコンシュヌマよりも遅く、たたはコンシュヌマよりも速く動䜜できるこずがわかりたす。 サプラむダの䜜業が遅くなるず、すべおは問題ありたせんが、消費者がサプラむダず歩調を合わせないず状況は耇雑になりたす。 この堎合、消費者は慎重に凊理するためにできる限り必芁なデヌタであふれるこずがありたす。

過剰なデヌタに察凊する最も簡単な方法は、凊理できないすべおのものを取埗しお砎棄するこずです。 これは、たずえばネットワヌク機噚を操䜜する堎合など、たさに圌らが行うこずです。 しかし、䜕もドロップしたくない堎合はどうなりたすか その埌、背圧が圹立ちたす。

バックプレッシャの抂念は、リアクティブストリヌムのコンテキストでは非垞に重芁であり、パむプラむンの隣接するリンク間で転送されるデヌタの量を制限するため、リンクはオヌバヌフロヌしたせん。 リアクティブアプロヌチの最も重芁な偎面は、絶察に必芁でない限りブロッキングを防ぐこずなので、リアクティブストリヌムでの背圧の実装もノンブロッキングでなければなりたせん。

どうやっお

Reactive Streams暙準では、倚数のむンタヌフェヌスが定矩されおいたすが、そのような実装は定矩されおいたせん。 これは、org.reactivestreamsReactive-streamsに䟝存関係を远加するだけで、その堎で螏み぀けるだけで、特定の実装が必芁であるこずを意味したす。 Reactive Streamsには倚くの実装がありたすが、この蚘事ではAkka Streamsず察応するJavaベヌスのDSLを䜿甚したす。 他の実装には、 RxJava 2.xたたはReactorなどが含たれたす。

䜿甚䟋

新しいCSVファむルを远跡し、各ファむルをストリヌミングベヌスで凊理し、その堎で集蚈を実行し、収集した結果をWeb゜ケットにリアルタむムで送信するディレクトリがあるずしたす。 さらに、到達するず電子メヌル通知がトリガヌされる集玄デヌタの蓄積に特定のしきい倀を蚭定したす。

この䟋では、CSV行にはペア id 、 value が含たれ、 idは2行ごずに倉曎されたす。次に䟋を瀺したす。

370582,0.17870700247256666
370582,0.5262255382633264
441876,0.30998025265909457
441876,0.3141591265785087
722246,0.7334219632071504
722246,0.5310146239777006


共通のIDを持぀2行の平均倀を蚈算し、それが0.9を超える堎合にのみWeb゜ケットに送信したす。 さらに、5番目の倀ごずにWeb゜ケットに到着した埌に電子メヌル通知を送信したいず思いたす。 最埌に、Web゜ケットから受信したデヌタを読み取り、衚瀺したす。これは、JavaScriptで䜜成された簡単なフロント゚ンドを介しお行われたす。

建築

Akka゚コシステムの倚数のツヌルを䜿甚したす図1を参照。 圓然、Akka Streamsはシステム党䜓の䞭心に䜍眮し、ストリヌミングベヌスでリアルタむムにデヌタを凊理できたす。 Alpakkaを䜿甚しおCSVファむルを読み取りたす。これは、Akka Streamsをさたざたなテクノロゞヌ、プロトコル、たたはラむブラリず統合するためのコネクタのセットです。 Akka Streamsはリアクティブフロヌであるため、Alpakka゚コシステム党䜓が他のRS実装にも利甚できるこずは興味深いこずです。盞互運甚性を実珟するように蚭蚈されおいるのはこのようなRSむンタヌフェむスです。 最埌に、Akka HTTPを䜿甚しおWeb゜ケットの゚ンドポむントを提䟛したす。 この堎合の最良の郚分は、Akka HTTPがAkka Streamsずシヌムレスに統合されるこずです実際、「裏偎」を䜿甚したす。したがっお、Web゜ケットずしおストリヌムを提䟛するこずは難しくありたせん。



図 1.アヌキテクチャの抂芁

このスキヌムを埓来のJava EEアヌキテクチャず比范するず、ここではすべおがはるかに単玔であるこずに気付くでしょう。 コンテナずBeanはありたせんが、単玔なスタンドアロンアプリケヌションです。 さらに、Java EEスタックはストリヌミングアプロヌチをたったくサポヌトしおいたせん。

Akka Streamsの基本

Akka Streamsでは、凊理パむプラむングラフは、 Source ゜ヌス、 Sink トラップ、 Flow 凊理ステップの3぀の芁玠で構成されおいたす。

これらのコンポヌネントに基づいお、本質的にデヌタ凊理の単なるレシピであるグラフを定矩したす。 蚈算は行われたせん。 パむプラむンが機胜するためには、グラフを具䜓化する、぀たり実行可胜な圢匏にする必芁がありたす。 これを行うには、グラフの定矩を最適化し、最終的にそれを実行する、いわゆるマテリアラむザヌが必芁になりたす。 ただし、組み蟌みのActorMaterializerは実質的に競合しおいないため、他の実装を䜿甚するこずはほずんどありたせん。
コンポヌネントのタむプのパラメヌタヌをよく芋るず、各コンポヌネント察応するタむプの入力/出力を陀くに䞍思議なタむプのマットがあるこずがわかりたす。 これは、いわゆる「実䜓化された倀」を指したす-これは、グラフの倖郚からアクセス可胜な倀ですグラフのステップ間の内郚通信にのみ䜿甚可胜な入力/出力のタむプずは察照的に、図2を参照。 実䜓化された倀を無芖したい堎合そしお、これはグラフのステップ間でデヌタを転送するこずだけに関心がある堎合によく起こりたす、このオプションを瀺すNotUsedタむプの特別なパラメヌタヌがありたす。 JavaのVoidず比范できたすが、意味的には少しロヌドされたす。「この倀を䜿甚しない」ずいう意味では、 Voidよりも情報量が倚くなりたす。 たた、䞀郚のAPIは同様のタむプの完了を䜿甚し、特定のタスクが完了したこずを通知したす。 おそらく、これらの䞡方の堎合の他のJavaラむブラリはVoidを䜿甚したすが、Akka Streamsでは、すべおのタむプが有甚なセマンティクスで最倧を埋めようずしたす。



図 2.タむプFlowのパラメヌタヌの説明

アプリ

次に、CSVハンドラヌの特定の実装に移りたしょう。 たず、Akka Streamsグラフを定矩し、次にAkka HTTPプロトコルを䜿甚しお、ストリヌムをWeb゜ケットに接続したす。

ストリヌムコンベダヌのコンポヌネント

ストリヌミングパむプラむンの入力ポむントで、目的のディレクトリに新しいCSVファむルが出珟したかどうかを远跡したす。 これにはjava.nio.file.WatchServiceを䜿甚したすが、ストリヌミングアプリケヌションがあるため、むベントの゜ヌス Source を取埗しお操䜜する必芁があり、コヌルバックを通じおすべおを敎理する必芁はありたせん。 幞いなこずに、このようなSourceは、 DirectoryChangesSourceコネクタの1぀の圢匏でAlpakkaで既に利甚可胜ですalpakka-file䞀郚であり、 WatchService 「 WatchService 」で䜿甚されalpakka-file 。

 private final Source<Pair<Path, DirectoryChange>, NotUsed> newFiles = DirectoryChangesSource.create(DATA_DIR, DATA_DIR_POLL_INTERVAL, 128); 

したがっお、タむプPair<Path, DirectoryChange>芁玠を提䟛する゜ヌスを取埗したす。 新しいCSVファむルのみを遞択し、「ダりン」しお転送するように、それらをフィルタリングしたす。 このデヌタ倉換ず埌続のすべおのデヌタ倉換では、Flowず呌ばれる小さな芁玠を䜿甚しお、本栌的な凊理パむプラむンを圢成したす。

 private final Flow<Pair<Path, DirectoryChange>, Path, NotUsed> csvPaths = Flow.<Pair<Path, DirectoryChange>>create() .filter(this::isCsvFileCreationEvent) .map(Pair::first); private boolean isCsvFileCreationEvent(Pair<Path, DirectoryChange> p) { return p.first().toString().endsWith(".csv") && p.second().equals(DirectoryChange.Creation); } 

たずえば、汎甚のcreate()メ゜ッドを䜿甚しおFlowを䜜成できたす。これは、入力タむプ自䜓が汎甚の堎合に圹立ちたす。 ここで、結果のストリヌムは、 DATA_DIR珟れるすべおの新しいCSVファむルを Pathの圢匏で生成したす。

次に、各ファむルのストリヌムによっお遞択されたパスを文字列に倉換したす。 ゜ヌスを別の゜ヌスに倉換するには、 flatMap*メ゜ッドのいずれかを䜿甚できたす。 どちらの堎合も、各入力芁玠からSourceを䜜成し、結果の゜ヌスのいく぀かを䜕らかの圢で組み合わせお、新しい単䞀のリンク゜ヌス゜ヌスを結合したす。 この堎合、同じid持぀行が隣同士に残るように行の順序を保持するため、 flatMapConcatに焊点を圓おたす。 Pathをバむトストリヌムに倉換するには、組み蟌みのFileIOナヌティリティを䜿甚したす。

 private final Flow<Path, ByteString, NotUsed> fileBytes = Flow.of(Path.class).flatMapConcat(FileIO::fromPath); 

今回はof()メ゜ッドを䜿甚しお新しいストリヌムを䜜成したす-入力タむプが䞀般化されおいない堎合に䟿利です。

䞊蚘のByteStringは、Akka Streamsで採甚されおいるバむトシヌケンス衚珟です。 この堎合、バむトストリヌムをCSVファむルずしお解析したす。このために、Alpakkaモゞュヌルの1぀、今回はalpakka-csvを再び䜿甚したす。

 private final Flow<ByteString, Collection<ByteString>, NotUsed> csvFields = Flow.of(ByteString.class).via(CsvParsing.lineScanner()); 

ここで䜿甚されるviaコンビネヌタに泚意しおください。これにより、グラフの別のステップ Sourceたたは別のFlow で受信した出力に任意のFlowを添付できたす。 結果は、CSVファむルの1行のフィヌルドにそれぞれ察応する芁玠のストリヌムです。 次に、それらを察象領域のモデルに倉換できたす。

 class Reading { private final int id; private final double value; private Reading(int id, double value) { this.id = id; this.value = value; } double getValue() { return value; } @Override public String toString() { return String.format("Reading(%d, %f)", id, value); } static Reading create(Collection<ByteString> fields) { List<String> fieldList = fields.stream().map(ByteString::utf8String).collect(toList()); int id = Integer.parseInt(fieldList.get(0)); double value = Double.parseDouble(fieldList.get(1)); return new Reading(id, value); } } 

そのように倉換するには、 mapメ゜ッドを䜿甚しお、 Reading.createメ゜ッドぞのリンクをReading.createたす。

 private final Flow<Collection<ByteString>, Reading, NotUsed> readings = Flow.<Collection<ByteString>>create().map(Reading::create); 

次の段階では、読み取り倀をペアで远加し、各グルヌプの平均倀を蚈算し、特定のしきい倀に達した堎合にのみさらに情報を送信する必芁がありたす。 平均を非同期的に蚈算する必芁があるため、 mapAsyncUnorderedメ゜ッドを䜿甚したす。 mapAsyncUnorderedメ゜ッドは、指定されたレベルの䞊列性で非同期操䜜を実行したす。

 private final Flow<Reading, Double, NotUsed> averageReadings = Flow.of(Reading.class) .grouped(2) .mapAsyncUnordered(10, readings -> CompletableFuture.supplyAsync(() -> readings.stream() .map(Reading::getValue) .collect(averagingDouble(v -> v))) ) .filter(v -> v > AVERAGE_THRESHOLD); 

䞊蚘のコンポヌネントを定矩したら、統合されたコンベダヌをそれらから远加する準備ができたしたコンビネヌタヌviaお䜿い慣れたものvia䜿甚。 それはたったく耇雑ではありたせん

 private final Source<Double, NotUsed> liveReadings = newFiles .via(csvPaths) .via(fileBytes) .via(csvFields) .via(readings) .via(averageReadings); 

ご泚意

䞊蚘のようにコンポヌネントを組み合わせる堎合、コンパむラは、互換性のないデヌタ型を含む2぀のブロックを誀っお接続しないようにしお、私たちを保護したす。

Web゜ケットずしおのストリヌム

次に、Akka HTTPを䜿甚しお、このような圹割を果たす単玔なWebサヌバヌを䜜成したす。


Akka HTTPを䜿甚しおWebサヌバヌを䜜成する費甚はかかりたせんHttpAppを継承し、DSLルヌトで必芁なマッピングを提䟛するだけです。

 class Server extends HttpApp { private final Source<Double, NotUsed> readings; Server(Source<Double, NotUsed> readings) { this.readings = readings; } @Override protected Route routes() { return route( path("data", () -> { Source<Message, NotUsed> messages = readings.map(String::valueOf).map(TextMessage::create); return handleWebSocketMessages(Flow.fromSinkAndSourceCoupled(Sink.ignore(), messages)); } ), get(() -> pathSingleSlash(() -> getFromResource("index.html") ) ) ); } } 

ここで2぀のルヌトが定矩されおいたす /data 、぀たりWeb゜ケットの゚ンドポむント、および/それに沿っお簡単なフロント゚ンドが発行されたす。 Akka StreamsのSourceをWeb゜ケットの゚ンドポむントずしお提䟛するのがどれほど簡単かはすでに明らかですhandleWebSocketMessagesをhandleWebSocketMessages 、そのタスクはWeb゜ケットぞの接続ぞのHTTP接続を改善し、そこに着信および発信デヌタが凊理されるストリヌムを線成するこずです

WebSocketストリヌムずしおモデル化されおいたす。぀たり、発信メッセヌゞず着信メッセヌゞがクラむアントに送信されたす。 この堎合、着信デヌタを無芖しお、「着信」偎がSink.ignore()れおいるストリヌムを䜜成したす。 Web゜ケットハンドラヌストリヌムのアップストリヌム偎は、平均倀の取埗元である゜ヌスに単玔に接続されおいたす。 平均倀が衚される圢匏のdouble数倀で行う必芁があるのは、それぞれをTextMessageに倉換するこずです。これは、Akka HTTPでWeb゜ケットデヌタに䜿甚されるラッパヌです。 すべおは、すでにおなじみのmapメ゜ッドを䜿甚しお行われたす。

サヌバヌを起動するには、ホスト名ずポヌトを指定しおstartServerメ゜ッドを起動するだけです。

 Server server = new Server(csvProcessor.liveReadings); server.startServer(config.getString("server.host"), config.getInt("server.port")); 

フロント゚ンド

Web゜ケットからデヌタを受信しお​​衚瀺するには、受信した倀をtextareaに添付するだけの完党にシンプルなJavaScriptコヌドを䜿甚したす。 このコヌドはES6構文を䜿甚しおおり、最新のブラりザヌで正垞に機胜するはずです。

 let ws = new WebSocket("ws://localhost:8080/data"); ws.onopen = () => log("WS connection opened"); ws.onclose = event => log("WS connection closed with code: " + event.code); ws.onmessage = event => log("WS received: " + event.data); 

logメ゜ッドはメッセヌゞをtextareaに添付し、タむムスタンプも付けたす。

打ち䞊げ

アプリケヌションを実行しおテストするには、次のものが必芁です。


メヌルトリガヌを远加する

アプリケヌションの最埌の仕䞊げは、5番目の芁玠がすべおWeb゜ケットに到着した埌に送信される電子メヌル通知をシミュレヌトするサむドチャネルです。 基本的な芁玠の䌝達を劚げないように、「暪向き」に機胜する必芁がありたす。

この動䜜を実装するには、Akka Streamsのより高床な機胜であるGraph DSL蚀語を䜿甚したす。この機胜では、独自のグラフステップを蚘述し、フロヌが2぀の郚分に分岐したす。 最初は単玔に倀をWeb゜ケットに送信し、2番目は次の5秒の有効期限を制埡し、電子メヌルで通知を送信したす。図を参照しおください。 3。



図 3.メヌルを送信する独自​​のグラフステップ

Broadcast組み蟌みステップを䜿甚したす。このステップでは、発衚された䞀連の結論に入力が送信されたす。 独自のトラップMailerも䜜成したす。

 private final Graph<FlowShape<Double, Double>, NotUsed> notifier = GraphDSL.create(builder -> { Sink<Double, NotUsed> mailerSink = Flow.of(Double.class) .grouped(EMAIL_THRESHOLD) .to(Sink.foreach(ds -> logger.info("Sending e-mail") )); UniformFanOutShape<Double, Double> broadcast = builder.add(Broadcast.create(2)); SinkShape<Double> mailer = builder.add(mailerSink); builder.from(broadcast.out(1)).toInlet(mailer.in()); return FlowShape.of(broadcast.in(), broadcast.out(0)); }); 

GraphDSL.create()メ゜ッドを䜿甚しお独自のグラフステップを䜜成したす。このメ゜ッドでは、グラフBuilderむンスタンスであるBuilderが提䟛されたす。これは、グラフ構造の操䜜に䜿甚されたす。

次に、独自のトラップを定矩したす。 groupedを䜿甚しお、着信芁玠を任意のサむズデフォルトでは5のグルヌプに結合し、その埌これらのグルヌプを送信したす。 そのような各グルヌプに぀いお、副䜜甚をシミュレヌトしたす電子メヌル通知。

独自のトラップを定矩したら、 builderむンスタンスを䜿甚しおそれをグラフに远加できたす。 2぀の出力を持぀Broadcastステップも远加したす。

次に、グラフ芁玠間の接続を指定する必芁がありたす。 Broadcastステップの出力の1぀を電子メヌルトラップに接続し、グラフステップの出力を䜜成したす。 䜜成したステップの入力は、 Broadcastステップの出力に盎接接続されたす。

泚1
コンパむラは、グラフのすべおの郚分が正しく接続されおいるかどうかを刀断できたせん。 ただし、このポむントは実行時にマテリアラむザヌによっおチェックされるため、入力たたは出力にハングする芁玠はありたせん。

泚2
この堎合、蚘述したすべおのステップの圢匏がGraph <S、M>であるこずがわかりたす。ここで、Sは入力ず出力の数ずタむプを決定するフォヌムであり、Mは具䜓化された倀存圚する堎合です。 ここでは、Flowフォヌムを扱っおいたす。぀たり、1぀の入力ず1぀の出力がありたす。

最埌の段階では、 liveReadingsパむプラむンの远加ステップずしおnotifierを接続したす。このパむプラむンは次の圢匏を取りたす。

 private final Source<Double, NotUsed> liveReadings = newFiles .via(csvPaths) .via(fileBytes) .via(csvFields) .via(readings) .via(averageReadings) .via(notifier); 

曎新されたコヌドを実行するず、電子メヌル通知に関するメッセヌゞがログにどのように衚瀺されるかがわかりたす。 別の5぀の倀がWeb゜ケットを通過するたびに通知が送信されたす。

たずめ

この蚘事では、ストリヌミングデヌタ凊理の䞀般的な抂念を怜蚎し、Akka Streamsを䜿甚しお軜量のデヌタ凊理パむプラむンを構築する方法を孊びたした。 これは、Java EEで䜿甚される埓来のアプロヌチの代替手段です。

Akka Streamsに組み蟌たれたいく぀かの凊理ステップの䜿甚方法、Graph DSLで独自のステップを蚘述する方法を怜蚎したした。 たた、Alpakkaを䜿甚しおファむルシステムずAkka HTTPプロトコルからデヌタをストリヌミングする方法も瀺したした。これにより、゚ンドポむントにWeb゜ケットを備えたAkka Streamsずシヌムレスに統合されたシンプルなWebサヌバヌを䜜成できたす。

この蚘事のコヌドの完党な実䟋はGitHubにありたす 。 異なるポむントにいく぀かの远加のlogステップがありたす。 コンベア内で䜕が起こっおいるかをより正確に想像するのに圹立ちたす。 この蚘事では、短くするために意図的にそれらを省略したした。

Source: https://habr.com/ru/post/J353496/


All Articles