
ã¿ãªããããã«ã¡ã¯ïŒ ç§ã®ååã¯ãªã§ãŒã«ã§ãFunCorpã®ããã¯ãšã³ãéçºè
ãšããŠåããŠããŸãã 仿¥ã¯ããªã¢ã¯ãã£ãããã°ã©ãã³ã°ãReactorã©ã€ãã©ãªãããã³Webã«ã€ããŠå°ã説æããŸãã
ãªã¢ã¯ãã£ãããã°ã©ãã³ã°ã¯ãã°ãã°ãèšåããããŸãããïŒèšäºã®èè
ã®ããã«ïŒããã§ããããäœã§ãããããããªãå Žåã¯ãå¿«é©ã«ãªã£ãŠãäžç·ã«èããŠã¿ãŠãã ããã
ãªã¢ã¯ãã£ãããã°ã©ãã³ã°ãšã¯äœã§ããïŒ
ãªã¢ã¯ãã£ãããã°ã©ãã³ã°ã¯ãéåæããŒã¿ã¹ããªãŒã ã®ç®¡çã§ãã ãšãŠãç°¡åã§ãã ç§ãã¡ã¯ãã£ãã¡ãªäººã
ã§ããã ããªãã®ãããã§ã¹ãã®ãã¹ãŠãè©³çŽ°ã«æãäžããããšã¯ããŸããããããã¯äŸ¡å€ãããã§ãããã
ãããŠããŠã§ãã¯ã©ãã«ãããŸããïŒ
ãããã«ãããšãHTTPãµãŒããŒããããŒã¿ããŒã¹ãã©ã€ããŒã§çµããReactive Manifestoã®ãã¹ãŠã®æšæºã«åŸã£ãŠããªã¢ã¯ãã£ãã«ã·ã¹ãã ãæ§ç¯ãããšã忥ããå¯èœæ§ããããŸãã ããŠããŸãã¯å°ãªããšãé«å質ã®ããã¯ãšã³ããæ§ç¯ããŸãã
ããã¯ããã¡ãããç°¡åãªguã§ãã ãã ãããŠãŒã¶ãŒã±ãŒã¹ãè€æ°ã®ãªã¯ãšã¹ããåŠçããå¿
ãããé«éã§ã¯ãªãå ŽåããµãŒãã¬ããã³ã³ããã察å¿ã§ããªããªã£ãå Žåã¯ããªã¢ã¯ãã£ãã®çŸããäžçã«ããããïŒ
128ã®é£ç¶ãã䞊åãªã¯ãšã¹ããããå ŽåããµãŒãã¬ããã³ã³ããã¯ãããããžã§ãã«é©ããããŒã«ã§ã¯ãããŸããã
ãããŠã Nettyã§ãªãå Žåã¯ããªã¢ã¯ãã£ãã«äœãæžãã¹ãã§ããïŒ è£žã®Nettyã§ããã¯ãšã³ããæžãã®ã¯éªšãæããããšã¯æ³šç®ã«å€ããŸãããæœè±¡åãè¡ããšäŸ¿å©ã§ãã
Nettyã«é©ãããµãŒããŒæœè±¡åã¯ããã»ã©å€ããªããããPivoââtalã®ã¹ã¿ããã¯ã Spring Boot 2ã§Nettyã®ãµããŒãã远å ããŸããã 2018幎3æ1æ¥ã«ãããããã¹ãŠãéå§ãããŸãã ã éåžžã«æºè¶³ãããããã«ã圌ãã¯WebFluxã¢ãžã¥ãŒã«ãäœæããŸãããããã¯ã Spring MVCã«ä»£ãããã®ã§ãããWebãµãŒãã¹ãèšè¿°ããããã®äºåŸå¯Ÿå¿çãªã¢ãããŒãã§ãã
WebFluxã¯èªåèªèº«ããã€ã¯ããã¬ãŒã ã¯ãŒã¯ïŒãã€ã¯ããã¬ãŒã ã¯ãŒã¯ãšSpringãhahaïŒãšããŠäœçœ®ä»ãããããã®ïŒç§ãã¡ã®ïŒãã¡ãã·ã§ããã«ãªãã€ã¯ããµãŒãã¹ã«é©åããããšãçŽæããAPIãæ©èœçãªã¹ã¿ã€ã«ã§æç€ºãã ãã§ã«Habréã§èšåãããŠããŸã ã 詳现ïŒSpring MVCãšã®éããå«ãïŒã¯ã ããã«ãããŸã ã ãããã仿¥ã¯äœãä»ã®ãã®ã«ã€ããŠã WebFluxã¯ãReactorã©ã€ãã©ãªã«åºã¥ããŠããŸãã 圌女ã«ã€ããŠè©±ããŸãããã
Reactorã¯ãPivoââtalãéçºãããªã¢ã¯ãã£ãïŒçªç¶ïŒãªãŒãã³ãœãŒã¹ãã©ãããã©ãŒã ã§ãã ç§ã¯ã ãã®çŽ æŽããã峿žé€šã®ç޹ä»ãç¡æã§ïŒã³ã¡ã³ãä»ãã§ïŒèªãçŽãããšã«ããŸããã
è¡ãã
ãããã¯ã³ãŒãïŒå°ããªã³ãŒãçšïŒ
Javaã³ãŒãã¯éåžžãããã¯ããŠããŸãã ããšãã°ãHTTPçµç±ã®åŒã³åºããããŒã¿ããŒã¹ãžã®ã¯ãšãªã¯ããµãŒãããŒãã£ã®ãµãŒãã¹ãå¿çãããŸã§çŸåšã®ã¹ã¬ããããã³ã°ãããŸãã ãµãŒãã¹ã蚱容å¯èœãªæéãæ
åœããŠããå Žåãããã¯éåžžã®æ¹æ³ã§ãã ãã以å€ã®å Žåããã®ã±ãŒã¹ã¯ããã«ããã¯ã«ãªããŸãã 䞊ååãäœåãªããããåãããããã³ã°ã³ãŒããå®è¡ããã¹ã¬ãããããã«å®è¡ããŸãã ãã®éçšã§ãç«¶åãšç«¶äºåã®åé¡ã解決ããå¿
èŠããããŸãã
ç¹ã«I / Oãåå ã§é »ç¹ã«ãããã¯ãããïŒãããŠãå€ãã®ã¢ãã€ã«ã¯ã©ã€ã¢ã³ããããå Žåã I / OããŸã£ããé«éã§ã¯ãªã ïŒããã倿°ã®ã¹ã¬ãããããŒã¿ãåŸ
æ©ããã³ã³ããã¹ãã®åãæ¿ããªã©ã«è²ŽéãªãªãœãŒã¹ãè²»ãããŠããŸãã
䞊ååã¯ããã¹ãŠã®åé¡ã解決ããéæ³ã®æã§ã¯ãããŸããã ããã¯ããªãŒããŒãããã䌎ãè€éãªããŒã«ã§ãã
éåæ&&ãã³ããããã³ã°
ãããã®çšèªã¯èŠã€ãããããçè§£ãã«ãããå¿ããããšãã§ããŸããã ãããããããã¯åå¿æ§ã«é¢ããŠãã°ãã°çŸããã®ã§ãããããçè§£ããŠã¿ãŸãããã
äžèšã®ããã¹ããããããããã³ã°ã³ãŒãããã¹ãŠã®åå ã§ãããšçµè«ä»ããããšãã§ããŸãã OKããã³ããããã³ã°ãæžãå§ããŸãããã ããã¯ã©ãããæå³ã§ããïŒ çµæãæäŸããæºåããŸã æŽã£ãŠããªãå Žåã¯ããããåŸ
ã€ä»£ããã«ãåŸã§ãªã¯ãšã¹ããç¹°ãè¿ãããã«èŠæ±ãããªã©ãäœããã®ãšã©ãŒãçºçãããŸãã ãã¡ããã¯ãŒã«ã§ããããã®ééãã§äœãããŸããïŒ ãããã£ãŠãåŸã§çãã«å¿çããããã«éåæåŠçãååŸããŸãããã¹ãŠæºåãã§ããŸããïŒ
éåæã§ãã³ããããã³ã°ã®ã³ãŒããæžãå¿
èŠããããŸããããã¹ãŠããŸããããŸããïŒ ãããã圌ã¯ããŸããã ããããããã¯äººçãæ¥œã«ããããšãã§ããŸãã ãããè¡ãããã«ã芪åã§è³¢ã人ã
ãããããçš®é¡ã®ä»æ§ïŒãªã¢ã¯ãã£ããªä»æ§ãå«ãïŒãçºæãããããã®ä»æ§ãå°éããã©ã€ãã©ãªãŒãèŠã€ããŸããã
ããã§ããªã¢ã¯ã¿ãŒã éåžžã«çãå Žå
å®éãReactorïŒå°ãªããšãã³ã¢éšåïŒã¯ã Reactive Streams仿§ã®å®è£
ã§ããã ReactiveXæŒç®åã®äžéšã§ãã ããããããã«ã€ããŠã¯åŸã§ã
RxJavaã«ç²ŸéããŠããããŸãã¯èããŠããå ŽåãReactorã¯RxJavaã®ã¢ãããŒããšå²åŠãå
±æããŠããŸãããå€ãã®ã»ãã³ãã£ãã¯ã®éãããããŸãïŒRxJavaãšAndroidéçºã®æ©èœãšã®åŸæ¹äºææ§ã®ããã«å€§ãããªããŸãïŒã
Javaã®ãªã¢ã¯ãã£ãã¹ããªãŒã ãšã¯äœã§ããïŒ
倱瀌ãªå Žåã reactive-streams-jvmã©ã€ãã©ãªã«è¡šç€ºããã4ã€ã®ã€ã³ã¿ãŒãã§ã€ã¹ããããŸãã
- åºç瀟
- å å
¥è
- å®æè³Œèª
- ããã»ããµãŒ
ãããã®æ£ç¢ºãªã³ããŒã¯ãFlowã¯ã©ã¹9ã«ååšããŸãã
ããã«å€±ç€Œãªå Žåã圌ãã¯ãã¹ãŠæ¬¡ã®èŠä»¶ãèãåºãïŒ
- ASYNC-éåæ;
- NIO-å
¥å/åºåã®ããã³ããããã³ã°ãã
- RESPECT BACKPRESSURE-ããŒã¿ãæ¶è²»ããããããéã衚瀺ãããç¶æ³ãåŠçããæ©èœïŒåæãåœä»€åã³ãŒãã§ã¯ãåæ§ã®ç¶æ³ã¯çºçããŸãããããªã¢ã¯ãã£ãã·ã¹ãã ã§ã¯ãããããèŠãããŸãïŒã
JDK 9ã®Flowã¯ã©ã¹ã®ã³ãŒããèŠãŠã¿ãŸãããïŒç°¡æœã«ããããJavadocã³ã¡ã³ãã¯åé€ãããŠããŸãïŒã
public final class Flow { public static interface Publisher<T> { public void subscribe(Subscriber<? super T> subscriber); } public static interface Subscriber<T> { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); } public static interface Subscription { public void request(long n); public void cancel(); } public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { } }
ãããŸã§ã®ãšãããããã¯ãã¹ãŠJDKã¬ãã«ã®åå¿æ§ãµããŒãã§ãã ã€ã³ãã¥ããŒã¿ãŒã¢ãžã¥ãŒã«ã®ã©ããã§ãHTTP / 2ã¯ã©ã€ã¢ã³ããæçããŠãããFlowãã¢ã¯ãã£ãã«äœ¿çšãããŠããŸãã JDK 9å
ã§ä»ã®çšéã¯èŠã€ãããŸããã§ããã
çµ±å
Reactorã¯ã CompletableFutureãStreamãDurationãªã©ããæ°ã«å
¥ãã®Java 8 Pribludaã«çµ±åãããŠããŸãã IPCã¢ãžã¥ãŒã«ããµããŒãããŸã ã Akkaããã³RxJavaçšã®ã¢ããã¿ãŒ ã ãã¹ãã¢ãžã¥ãŒã«ïŒ ãã¹ãã®äœæçšïŒãããã³è¿œå ïŒãŠãŒãã£ãªãã£ã¯ã©ã¹ïŒããããŸãã
Redisãã¡ã³ã®å Žåã ã¬ã¿ã¹/ redissonã¯ã©ã€ã¢ã³ãã«ã¯ãReactorããµããŒããããªã¢ã¯ãã£ãAPIããããŸãã
MongoDBã®ãã¡ã³ã«ã¯ãReactive Streamsãå®è£
ããå
¬åŒã®ãžã§ãããã©ã€ããŒããããŸããããããReactorããããç°¡åã«éžæã§ããçç±ã§ãã
ããŠãã©ã®ããã«ãã¹ãŠãå§ããŸãããïŒ
ãããã¯ãã¹ãŠJDK8以éã§å®è¡ã§ããŸãã ãã ããAndroidãšèªåã®ãã®ïŒminSdk <26ïŒã䜿çšããŠããå Žåã¯ãRxJava 2ã確èªããããšããå§ãããŸãã
Mavenãããå Žå <dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>Bismuth-RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> </dependencies>
ãããããã®å Žå plugins { id "io.spring.dependency-management" version "1.0.1.RELEASE" } dependencyManagement { imports { mavenBom "io.projectreactor:reactor-bom:Bismuth-RELEASE" } } dependencies { compile 'io.projectreactor:reactor-core' }
BOMã¯ãReactorã®ããŸããŸãªéšåã®éã®äºææ§ãé«ããããã«äœ¿çšãããŸãã Gradleã«ã¯ãã€ãã£ãã®BOMãµããŒãããªãããããã©ã°ã€ã³ãå¿
èŠã§ã ã
Reactorã¯KotlinããµããŒãããŠããŸã ã
äŸ
ãã®ãããéåæã®éããããã³ã°ã³ãŒããèšè¿°ããå¿
èŠããããŸãã èšãæãããšãçŸåšã®å®è¡ã¹ã¬ãããããã¯ããŠåŸ
æ©ããã®ã§ã¯ãªããæçšãªãã®ã«åãæ¿ããŠãéåæåŠçãå®äºãããšãã«çŸåšã®ããã»ã¹ã«æ»ãããšãèš±å¯ããŸãã
JavaãšåŒã°ããæ¥åœããã®è¯ãå³¶ã§ã¯ãããã«ã¯äž»ã«2ã€ã®æ¹æ³ããããŸãã
ãããã¯ããç¥ãããããŒã«ã§ãããããæç¹ã§äžååã«ãªããŸãã
ã³ãŒã«ããã¯ã®åé¡
ã³ãŒã«ããã¯ã¯äœæãé£ãããããã«ãã³ãŒã«ããã¯å°çããšåŒã°ããããã·ã¥ã«å€ãããŸãã
äŸãèŠãŠã¿ãŸããã
ãŠãŒã¶ãŒã«äžäœ5ã€ã®ããŒã ã衚瀺ããå¿
èŠããããŸããããã§ãªãå Žåã¯ããªãã¡ãŒãµãŒãã¹ã«ç§»åããŠããããã5ã€ã®ããŒã ãååŸããŸãã
åèšã§3ã€ã®ãµãŒãã¹ãé¢ä¿ããŠããŸãã1ã€ç®ã¯ãŠãŒã¶ãŒã®ãæ°ã«å
¥ãã®ããŒã ã®IDãæäŸãã2ã€ç®ã¯ããŒã èªäœãååŸãã3ã€ç®ã¯ãæ°ã«å
¥ãã®ããŒã ããªãå Žåã«ãªãã¡ãŒãæäŸããŸãã
ã©ãããã¯ãŒã«ã§ã¯ãªãããã§ãã
ããã§ã¯ãReactorã§ã©ã®ããã«è¡ãããèŠãŠã¿ãŸãããã
ãããã800ããªç§ã®ã¿ã€ã ã¢ãŠãã§çªç¶èœã¡ããã£ãã·ã¥ãããããŒã¿ãããŒããããå Žåã¯ã©ãã§ããããïŒ
userService.getFavoriteMemes(userId) .timeout(Duration.ofMillis(800))
Reactorã§ã¯ãã³ãŒã«ãã§ãŒã³ã«ã¿ã€ã ã¢ãŠãã¹ããŒãã¡ã³ãã远å ããã ãã§ãã ã¿ã€ã ã¢ãŠãã¯äŸå€ãã¹ããŒããŸãã onErrorResumeæŒç®åã䜿çšããŠããšã©ãŒãçºçããå Žåã«ããŒã¿ãååŸãã代æ¿ïŒãã©ãŒã«ããã¯ïŒãœãŒã¹ãæå®ããŸãã
20ã®ã³ãŒã«ããã¯ïŒ8ããããCompletableFutureããããŸã
ååãšçµ±èšãèŠæ±ããããããããŒãšå€ã®ãã¢ã®åœ¢åŒã§çµåããIDã®ãªã¹ãããããŸããããã¯ãã¹ãŠéåæã§ãã
CompletableFuture<List<String>> ids = ifhIds();
Reactorã§ãããè¡ãã«ã¯ã©ãããã°ããã§ããïŒ
Flux<String> ids = ifhrIds(); Flux<String> combinations = ids.flatMap(id -> { Mono<String> nameTask = ifhrName(id); Mono<Integer> statTask = ifhrStat(id);
ãã®çµæãæ§æå¯èœã§èªã¿åãå¯èœãªé«ã¬ãã«APIïŒ å®éã«ã¯ãåãã¹ã¿ã€ã«ã§éåæã³ãŒããèšè¿°ããæ¹æ³ãå¿
èŠã ã£ããããæåã¯Reactorã䜿çšããŸãã ïŒãããã³ãã®ä»ã®å©ç¹ïŒé
å»¶å®è¡ãbackPressure管çãããŸããŸãªã¹ã±ãžã¥ãŒã©ïŒã¹ã±ãžã¥ãŒã©ãŒïŒããã³çµ±åã
ããŠãä»ã®FluxãšMonoã¯äœã§ããïŒ
FluxãšMonoã¯ã2ã€ã®äž»èŠãªReactorããŒã¿æ§é ã§ãã
ãã©ãã¯ã¹

Fluxã¯Publisherã€ã³ã¿ãŒãã§ãŒã¹ã®å®è£
ã§ããã0ããNåã®èŠçŽ ã®ã·ãŒã±ã³ã¹ã§ãããçµäºããå ŽåããããŸãïŒãšã©ãŒãããå Žåãå«ãïŒã
Fluxã·ãŒã±ã³ã¹ã«ã¯3ã€ã®æå¹ãªå€ããããŸãïŒã·ãŒã±ã³ã¹ãªããžã§ã¯ããå®äºä¿¡å·ããŸãã¯ãšã©ãŒä¿¡å·ïŒããããonNext ã onCompleteãããã³onErrorã¡ãœããã®åŒã³åºãïŒã
3ã€ã®å€ã¯ãããããªãã·ã§ã³ã§ãã ããšãã°ãFluxã¯ç¡éã®ç©ºã®ã·ãŒã±ã³ã¹ã«ããããšãã§ããŸãïŒã¡ãœããã¯åŒã³åºãããŸããïŒã ãŸãã¯ãæåŸã®ç©ºã®ã·ãŒã±ã³ã¹ïŒonCompleteã®ã¿ãåŒã³åºãããŸãïŒã ãŸãã¯ãå€ã®ç¡éã·ãŒã±ã³ã¹ïŒonNextã®ã¿ãåŒã³åºãããŸãïŒã ç
ããšãã°ã Flux.intervalïŒïŒã¯ã Flux <Long>ã¿ã€ãã®ãã£ãã¯ã®ç¡éã·ãŒã±ã³ã¹ãè¿ããŸã ã
ãã¶ã€ã³ãèŠãïŒ
Flux .interval(Duration.ofSeconds(1)) .doOnEach(signal -> logger.info("{}", signal.get())) .blockLast();
次ã®ããã¹ãã衚瀺ãããŸãã
12:24:42.698 [parallel-1] INFO - 0 12:24:43.697 [parallel-1] INFO - 1 12:24:44.698 [parallel-1] INFO - 2 12:24:45.698 [parallel-1] INFO - 3 12:24:46.698 [parallel-1] INFO - 4 12:24:47.699 [parallel-1] INFO - 5 12:24:48.696 [parallel-1] INFO - 6 12:24:49.696 [parallel-1] INFO - 7 12:24:50.698 [parallel-1] INFO - 8 12:24:51.699 [parallel-1] INFO - 9 12:24:52.699 [parallel-1] INFO - 10
doOnEachïŒConsumer <T>ïŒã¡ãœããã¯ãã·ãŒã±ã³ã¹å
ã®åèŠçŽ ã«å¯äœçšãé©çšããŸããããã¯ããã®ã³ã°ã«äŸ¿å©ã§ãã
blockLastïŒïŒã«æ³šæããŠãã ãã ïŒas ã·ãŒã±ã³ã¹ã¯ç¡éã§ãããåŒã³åºããçºçãããããŒã¯ç¡éã«åŸ
æ©ããŸãã
RxJavaã«ç²ŸéããŠããå ŽåãFluxã¯Observableã«éåžžã«äŒŒãŠããŸãã
ã¢ã
Monoã¯Publisherã€ã³ã¿ãŒãã§ãŒã¹ã®å®è£
ã§ãããäœããã®éåæèŠçŽ ãŸãã¯ãã®äžåšMono.emptyïŒïŒã§ãã

Fluxãšã¯ç°ãªããMonoã¯1ã€ããã¢ã€ãã ãè¿ããŸããã Fluxãšåæ§ã«ã onCompleteïŒïŒããã³onErrorïŒïŒã®åŒã³åºãã¯ãªãã·ã§ã³ã§ãã
Monoã¯ããRunnableã«äŒŒããæ»ãçµæãªãã§ããå®äºããŠå¿ãããã¹ã¿ã€ã«ã®éåæã¿ã¹ã¯ãšããŠã䜿çšã§ããŸãã ãããè¡ãã«ã¯ãMono <Void>ãšããŠå®£èšããç©ºã®æŒç®åã䜿çšããŸãã
Mono<Void> asyncCall = Mono.fromRunnable(() -> {
RxJavaã«ç²ŸéããŠãããªããSingle + Maybeããã«ã¯ãã«ãšããŠMonoãåããŸããã
ãã®åé¢ã¯ãªãã§ããïŒ
FluxãšMonoã«åé¢ãããšããªã¢ã¯ãã£ãAPIã®ã»ãã³ãã£ã¯ã¹ãåäžãã衚çŸåã¯ååã«ãªããŸãããåé·ã§ã¯ãããŸããã
çæ³çã«ã¯ãæ»ãå€ãèŠãã ãã§ãã¡ãœããã®åäœãçè§£ã§ããŸãïŒããçš®ã®åŒã³åºãïŒMono <Void>ïŒãèŠæ±å¿çïŒMono <T>ïŒããŸãã¯ããŒã¿ã¹ããªãŒã ãè¿ãïŒFlux <T>ïŒã
FluxãšMonoã¯ã»ãã³ãã£ã¯ã¹ã䜿çšããäºãã«æµããŸãã Fluxã«ã¯Mono <T>ãè¿ãåäžã®ïŒïŒã¡ãœããããããMonoã«ã¯ãã§ã«Flux <T>ãè¿ãconcatWithïŒMono <T>ïŒã¡ãœããããããŸãã
ãŸããç¬èªã®æŒç®åããããŸãã ã·ãŒã±ã³ã¹å
ã®Nåã®èŠçŽ ïŒFluxïŒã§ã®ã¿æå³ãæã€ãã®ãããã°ãéã«1ã€ã®å€ã®ã¿ã«é¢é£ãããã®ããããŸãã ããšãã°ãMonoã«ã¯orïŒMono <T>ïŒããããFluxã«ã¯limit / takeã¹ããŒãã¡ã³ãããããŸãã
ãã®ä»ã®äŸ
Flux / Monoãäœæããæãç°¡åãªæ¹æ³ã¯ããããã®ã¯ã©ã¹ã§æç€ºãããå€ãã®ãã¡ã¯ããªã¡ãœããã®1ã€ã䜿çšããããšã§ãã
æºåå€ã§ãã©ãã¯ã¹ãåæåãã Flux<String> sequence = Flux.just("foo", "bar", "foobar");
å埩å¯èœããåæåã§ããŸã List<String> iterable = Arrays.asList("foo", "bar", "foobar"); Flux<String> sequence = Flux.fromIterable(iterable);
ãµãŒãããŒãã£ã®ãããªãã·ã£ãŒããã§ããŸã Publisher<String> publisher = redisson.getKeys().getKeys(); Flux<String> from = Flux.from(publisher);
ããŠãããããããšãã§ããŸã Mono<String> noData = Mono.empty();
FluxãšMonoã¯æ ãè
ã§ãã ããçš®ã®åŠçãéå§ããMonoãšFluxã«ããããŒã¿ã掻çšããã«ã¯ã .subscribeïŒïŒã䜿çšããŠãããããµãã¹ã¯ã©ã€ãããå¿
èŠããããŸãã
ãµãã¹ã¯ã©ã€ãã¯ãé
å»¶åäœãä¿èšŒãããšåæã«ãããŒã¿ã§äœãè¡ãå¿
èŠãããããç€ºãæ¹æ³ã§ãã ãµãã¹ã¯ã©ã€ãã¡ãœããã¯ãJava 8ã®ã©ã ãåŒããã©ã¡ãŒã¿ãŒãšããŠäœ¿çšããŸãã
åºå1ã2ã3 Flux<Integer> ints = Flux.range(1, 3); ints.subscribe(i -> System.out.println(i));
次ãåºåããŸãã
1 2 3
åºå1ã2ã3ããã³ãšã©ãŒ Flux<Integer> ints = Flux.range(1, 4) .map(i -> { if (i <= 3) { return i; } throw new RuntimeException("Got to 4"); }); ints.subscribe( i -> System.out.println(i), error -> System.err.println("Error: " + error) );
次ãåºåããŸãã
1 2 3 Error: java.lang.RuntimeException: Got to 4
åºå1ã2ã3ã4ãããã³å®äº Flux<Integer> ints = Flux.range(1, 4); ints.subscribe(i -> System.out.println(i), error -> System.err.println("Error " + error), () -> {System.out.println("Done"); });
次ãåºåããŸãã
1 2 3 4 Done
ããã©ã«ãã§ã¯ããããã¯ãã¹ãŠçŸåšã®ã¹ã¬ããã§æ©èœããŸãã å®è¡ã®ãããŒã¯ãããšãã°.publishOnïŒïŒæŒç®åã䜿çšããŠå€æŽããããã«é¢å¿ã®ããã¹ã±ãžã¥ãŒã©ãæž¡ãããšãã§ããŸãïŒSchedulerã¯ExecutorServiceã®ãããªã²ããã§ãïŒã
å®è¡ã®æµããå€ãã Flux<Integer> sequence = Flux.range(0, 100).publishOn(Schedulers.single());
以äžãåºåããŸãïŒ100åïŒïŒ
n = 0 Thread.currentThread() = Thread[single-1,5,main]
ã©ã®ãããªçµè«ãå°ãåºãããšãã§ããŸããïŒ
- CompletableFutureã«æ¬æãæã£ãŠãAPIãæ§æå¯èœæ§ãããã³å¯èªæ§ã¯è²§åŒ±ã§ãã
- Reactorã䜿çšãããšããããã¯ãèŠåŽãããã«éåæããŒã¿ã¹ããªãŒã ãæäœã§ããŸãã
- æ®å¿µãªãããããã¯ãšã³ãåŽããã¯ãå®å
šãªãªã¢ã¯ãã£ãã·ã¹ãã ã®æ§ç¯ã劚ããããã€ãã®çç±ããããŸãïŒããšãã°ããã©ã€ããŒã®ãããã¯ïŒã
- åå¿æ§ã¯ã³ãŒãã®çç£æ§ãé«ãããã®ã§ã¯ãããŸããããã¹ã±ãŒã©ããªãã£ã¯åäžããŸãã
- ä»ããReactorã䜿çšããŠãã¢ããªã±ãŒã·ã§ã³å
ã®ããŒã¿ã管çã§ããŸãã
è峿·±ãã¬ãã¥ãŒã倿ããŸããïŒãããïŒã ããªããèå³ãæã£ãŠããå Žå-æžããŠãç§ãã¡ã¯äœãèµ·ãã£ãŠããããæãäžããŸãã æ°è»œã«ã³ã¡ã³ãããŠãã ããïŒ
ãæž
èŽããããšãããããŸããïŒ
Reactorã®ããã¥ã¡ã³ãã«åºã¥ã
ãã®ææžã®ã³ããŒã¯ãå°å·ç©ãŸãã¯é»åçã«é
åžããããã©ããã«é¢ä¿ãªãããã®ãããªã³ããŒã«æéãè«æ±ãããããã«åã³ããŒã«ãã®èäœæš©è¡šç€ºãå«ãŸããŠããå ŽåããŠãŒã¶ãŒèªèº«ã®äœ¿çšããã³ä»è
ãžã®é
åžã®ããã«äœæããããšãã§ããŸãã
ç§ã¯ããã«ããŸãããããã£ãšãµããããç·æ§ãããŸãã ãããŠè²¢ç®è
/ã¡ã³ãããŒã