
RxJavaã«ãããªã¢ã¯ãã£ããã°ã¹ããªãŒã åŠç-ããŒãl
以åã®æçš¿ã§ãèè
ã¯ELKã¹ã¿ãã¯ã䜿çšããŠãã°ãåéããã±ãŒã¹ãæ€èšããŸããã
ãã€ã¯ããµãŒãã¹ãšã¢ããªã±ãŒã·ã§ã³ã®ã³ã³ããåãžã®åããèæ
®ãããšããã°ãšãã®ã¹ãã¬ãŒãžã®éäžåŠçãäºå®äžã®æšæºã«ãªãã€ã€ãããŸãã
次ã®ã¹ãããã«é²ãã§ãåãåã£ãæ
å ±ãããç©æ¥µçã«äœ¿çšããŠãå€ãã®åé¡ãçºçãããã£ãšåã«åå ãèŠã€ããå¿
èŠããããããããŸããã
èæ³š-ãã®ç¿»èš³ã®ã¹ããªãŒã ãšããŒã¿ã¹ããªãŒã ã¯äº€æå¯èœãªåèªã§ãã ãŸãããã°ãšããèšèã¯ãã°ãæå³ããå ŽåããããŸãããã»ãšãã©ã®å Žåãããã¹ãã§ã¯å¥ã®æå³ã䜿çšããŠããŸã
ã€ãã³ããã°ãã·ã¹ãã ã§ãªã¢ã«ã¿ã€ã ã§çºçããŠããããŒã¿ã¹ããªãŒã ãšèŠãªãå ŽåãããŒã¿ãšãªã¢ã«ã¿ã€ã ã§äœ¿çšå¯èœãªãã¹ãŠã®ãªãã·ã§ã³ãåæããããšã¯éåžžã«è峿·±ãã§ããããããšãã°ãããŸããŸãªæ
å ±ã¹ããªãŒã ãçŽæ¥éçŽããŠäžæ£ãªåäœãæ€åºãããæ»æãäžãæ»æè
ãå³åº§ã«ãããã¯ããŸãã ãåŸæ¥ãããŒã¿ãã°ãåéããã€ã³ã·ãã³ãåŸã«èª¿æ»ããã®ã§ã¯ãããŸããã
ãŸãã¯å¥ã®äŸã§ã¯ãç¹å®ã®ã¿ã€ãã®ã€ãã³ãã«å¯Ÿå¿ããã€ãã³ãã®ã¿ããã£ã«ã¿ãŒåŠçïŒ ãã£ã«ã¿ãŒ ïŒããuserIDãšããŠå
±éããŒã§ã°ã«ãŒãåïŒ ã°ã«ãŒãå ïŒããæéãŠã£ã³ããŠã§åèšæ°ãèšç®ãããŠãŒã¶ãŒãç¹å®ã®ã€ãã³ãã§å®è¡ãããã®ã¿ã€ãã®ã€ãã³ãæ°ãååŸããŸãæéã
failedLogStream() .window(5,TimeUnit.SECONDS) .flatMap(window -> window .groupBy(propertyStringValue("remoteIP")) .flatMap(grouped -> grouped .count() .map( failedLoginsCount -> { final String remoteIp = grouped.getKey(); return new Pair<>(remoteIp, failedLoginsCount); })) ) .filter(pair -> pair.get > 10) .forEach(System.out::println);
ä»ã®ã·ã¹ãã ã§èŠæ±ãéå§ãããã®å¿çãããŒã¿ã¹ããªãŒã ãšããŠåŠçã§ããŸããããããµãã¹ã¯ã©ã€ããã ãªã¢ã¯ãã£ãã¹ããªãŒã ãã¬ãŒã ã¯ãŒã¯ã§æç€ºãããã¹ããªãŒã ïŒããŒã¿ã¹ããªãŒã ïŒãåŠçããããã«äœ¿ãæ
£ããæŒç®åã䜿çšã§ããŸãã
æ°ããéçºãã©ãã€ã ãåŠã¶
ã¹ããªãŒã ã®ãªã¢ã¯ãã£ãããã°ã©ãã³ã°ãäœã§ããããçè§£ããŠãããšããã§ãããããã®ããã Kafka Streams SparkãFlinkãªã©ã®å€§ããªãã®ããããã€ããå¿
èŠã¯ãããŸããã
ãªã¢ã¯ãã£ãããã°ã©ãã³ã°ã¯ãã³ããããã³ã° ã€ãã³ãã§ãããã«ãŠã³ã¿ãŒããŒãïŒã¡ãŒã«ãŒããã®ããŒã¿éãæ¶è²»è
ãåä¿¡ããããŒã¿éãè¶
ããªããã£ãŒãããã¯ã¡ã«ããºã ïŒãåããå°æ°ã®ã¹ã¬ããã§ãã¹ã±ãŒãªã³ã°ããã¢ããªã±ãŒã·ã§ã³ã§ãã
Spring5ãããããæå€§ã®ãããã¯ã¯ã Jetããã°ã©ãã³ã°ã®ãµããŒãã§ã ã æ°ããspring-web-reactiveã¢ãžã¥ãŒã«ã¯spring-web-mvcã«äŒŒããã¬ãŒã ã¯ãŒã¯ã§ãRESTãµãŒãã¹ãšãªã¢ã¯ãã£ãWebã¯ã©ã€ã¢ã³ãã®éåæïŒéããããã³ã°ïŒå¿çãéä¿¡ã§ããŸããããã¯ããã®ãœãªã¥ãŒã·ã§ã³ããã€ã¯ããµãŒãã¹ã¢ãŒããã¯ãã£ã«äœ¿çšã§ããå¯èœæ§ã瀺åããŠããŸãã ãªã¢ã¯ãã£ãã¹ããªãŒã ã®æŠå¿µã¯ãSpringã«åºæã§ã¯ãããŸãããããã¯ãã»ãšãã©ã®ãªã¢ã¯ãã£ããã¬ãŒã ã¯ãŒã¯ã§åæããããªã¢ã¯ãã£ãã¹ããªãŒã -jvmãšããå
±éã®ä»æ§ãããããã§ãïŒåœé¢ã¯ãåãååã¯ãªããããããŸãããããã¬ãŒã ã¯ãŒã¯ã®ä»£æ¿ã«ãªãã»ã©ã³ã³ã»ããã¯ã·ã³ãã«ã§ããå¿
èŠããããŸãïŒã
æŽå²çã«ã ãžã§ããã¹ããªãŒã ã¢ãã«ã¯Rx.NETã«ãã£ãŠå°å
¥ãããNetflixã䜿çšããŠRxJavaãšåŒã°ããjavaã«ç§»æ€ãããŸããã åæã«ããã®æŠå¿µã¯Reactive EXtensionsãšåŒã°ããä»ã®èšèªã§ãæ£åžžã«å®è£
ãããŸããã ãã以æ¥ãäŒæ¥ã¯ãžã§ããã¹ããªãŒã ã®ä»æ§ãšåãæ¹åã«åãã£ãŠããŸãã RxJavaã¯ãã€ãªãã¢ã§ãã£ãããã倧å¹
ãªãªãã¡ã¯ã¿ãªã³ã° ïŒã³ãŒãã®æžãçŽãïŒãå¿
èŠã§ãããããã£ãŠãããŒãžã§ã³2.xã¯ä»æ§ã«ããé©ããŠããŸããSpringãªã¢ã¯ã¿ã¯ãŸã æ°ãããã®ã§ãããäŒç€Ÿã仿§ã«åŸã£ãŠå®è£
ãæžãçŽãããšã¯é£ãããããŸããã ããããã©ã®ããã«é¢é£ããŠãããã«ã€ããŠãã£ãšèªãããšããå§ãããŸãã
Doug Leaã¯ãjava.util.concurrent.Flowãªããžã§ã¯ãã«ãžã§ããã¹ããªãŒã ãå«ããããšèšããŸãããããã¯ããžã§ããã¹ããªãŒã ãJava 9ã®äžéšãšããŠé
ä¿¡ãããããšãæå³ããŸãã
ããã©ãŒãã³ã¹ã®å©ç¹
ãŸããå¥ã®æµè¡èªã¯ãå€ãã®ç°ãªããµãŒãã¹ãèŠæ±ããå¿
é ã®æ©èœãæã€ãã€ã¯ããµãŒãã¹ã¢ãŒããã¯ãã£ã§ãã çæ³çã«ã¯ãå®å
šãªå¿çãæ¬¡ã®èŠæ±ãå®äºããã®ãåŸ
ããã«ãéããããã³ã°èŠæ±ãå®è¡ããããšãæåã§ãã ãµãŒãã¹ãçµæã®å€§ããªãªã¹ããè¿ãç¬éãåŸ
ã€ä»£ããã«ãæåã®ãã©ã°ã¡ã³ããåä¿¡ãããšãã«å¥ã®ã·ã¹ãã ã«æ°ãããªã¯ãšã¹ããéä¿¡ãã䟡å€ããããšèããŠãã ããã

ãªã¢ãŒãèŠæ±ããã®å¿çãã¹ããªãŒã ïŒã¹ããªãŒã ããŒã¿ã¹ããªãŒã ïŒãå¿çãåä¿¡ãããšãã«ã¢ã¯ã·ã§ã³ïŒã¢ã¯ã·ã§ã³ïŒãèµ·åãããµãã¹ã¯ãªãã·ã§ã³ãšèŠãªãå Žåãå¿çãåŸ
æ©ããŠããã¹ããªãŒã ããããã¯ãã代ããã«ãäžè¬çã«å°æ°ã®ã¹ããªãŒã ã䜿çšã§ããŸãã ããªãœãŒã¹ã®ã³ã¹ããåæžããŸãïŒããšãã°ãã¹ã¬ããã®ã¹ã¿ãã¯ããšã«ã¹ã¬ãããšã¡ã¢ãªéã§ã³ã³ããã¹ããåãæ¿ããããã®ããã»ããµæéïŒã
ãããã£ãŠããªã¢ã¯ãã£ãããã°ã©ãã³ã°ã䜿çšãããšãæšæºã®ããŒããŠã§ã¢ã§éåžžããå€ãã®ã€ãã³ããã°ãåŠçã§ããŸãã
äŸïŒGmailãªã©ã®ãµãŒãã¹ã§ã¯ããŠãŒã¶ãŒã®ã¡ãŒã«ã衚瀺ããå¿
èŠããããŸãã ãã ããã¡ãŒã«ã«ã¯å€ãã®äººãåã£ãŠããå¯èœæ§ããããŸãïŒCCïŒã é£çµ¡å
ã«ãããŠãŒã¶ãŒã«åçã衚瀺ããã®ã¯æ¥œããã§ããããã€ãŸããREST-ContactServiceãåŒã³åºããŸãã
次ã®ããã«ãªããŸãã
Future<List<Mail>> emailsFuture = mailstoreService.getUnreadEmails(); List<Mail> emails = emailsFuture.get(); // // , // , ? Future<List<Contacts>> contacts = getContactsForEmails(emails); for(Mail mail : emails) { streamRenderEmails(mail, contacts); //push() emails }
CompletableFutureã䜿çšããJava 8ã®ãªã¢ã¯ãã£ãããã°ã©ãã³ã°ïŒthenComposeãthenCombineãthenAcceptãããã«50åã®ã¡ãœããã䜿çšïŒã§åé¡ã®äžéšã解決ãããŸããããããã¯ããããè¡ããã¹ãŠãèŠããŠããå¿
èŠããããšããäºå®ãåŠå®ãããã®ã§ã¯ãããŸããããããã¯èªãã®ã«ã¯åœ¹ç«ã¡ãŸããã³ãŒãïŒã
CompletableFuture<List<Mail>> emailsFuture = mailstoreService.getUnreadEmails(); CompletableFuture<List<Contact>> emailsFuture .thenCompose(emails -> getContactsForEmails(emails)) // List<Mail> .thenAccept(emailsContactsPair -> streamRenderEmails(emailsContactsPair.getKey(), emailsContactsPair.getValue()))
Listã®ä»£ããã«Iteratorã«åãæ¿ããããšãã§ããåæã«æ°ããå€ã衚瀺ããããšãã«ã¢ã¯ã·ã§ã³ãå®è¡ããããã«æç€ºããã¡ãœããã¯ãããŸããã SQLã«ã¯ããã®ãããªå¯èœæ§ããããŸããããšãã°ããã¹ãŠã®ããŒã¿ãã¡ã¢ãªã«ããŒããã代ããã«ãResultSetïŒrs.nextïŒïŒãå®è¡ã§ããŸãïŒã§ãã
public interface Iterator<E> { boolean hasNext(); E next(); }
ãããããæ°ããæå³ããããŸããïŒããšåžžã«å°ããå¿
èŠããããŸãã
Iterable<Mail> emails = mailstoreService.getUnreadEmails(); Iterator<Mail> emailsIt = emails.iterator(); while(emailsIt.hasNext()) { Mail mail = emailsIt.next(); // if(mail != null) { .... } }
å¿
èŠãªã®ã¯ããªã¢ã¯ãã£ãã€ãã¬ãŒã¿ã§ããããã¯ãæ°ããå€ãåä¿¡ããããšããã«ãµãã¹ã¯ã©ã€ãããŠã¢ã¯ã·ã§ã³ãå®è¡ã§ããã¿ã€ãã®ããŒã¿ã§ãã ãªã¢ã¯ãã£ãã¹ããªãŒã ããã°ã©ãã³ã°ã¯ããããå§ãŸããŸãã
ããã§ã¯ãã¹ããªãŒã ãšã¯äœã§ããïŒ

ã¹ããªãŒã ã¯ãåã«æéé ã«äžŠã¹ãããã€ãã³ãã®ã·ãŒã±ã³ã¹ã§ã ïŒ ã€ãã³ã Xã¯ã€ãã³ãYã®åŸã«çºçããããã ã€ãã³ãã¯äºãã«ç«¶åããŸãã ïŒã
ã¹ããªãŒã ã¯ã 0..Nã€ãã³ããš2ã€ã®ç«¯æ«æäœã®ãããããçæããããã«ã¢ãã«åãããŠããŸãã
- ããŒã¿ãªãªãŒã¹ãçµäºããããšããµãã¹ã¯ã©ã€ãã«éç¥ããå®äºã€ãã³ã
- ãšã©ãŒã®ããã¹ããªãŒã ã®çµäºãéç¥ãããšã©ãŒã€ãã³ãïŒäŸå€ïŒ
ãããã 倧çç³ã®å³è¡š ãã䜿çšããŠèŠèŠçã«èª¬æã§ããŸãã

ãããã£ãŠãã¹ããªãŒã ã¯åãªãã€ãã³ããã°ã§ã¯ãªãããã¹ãŠã§ãããšæ³åã§ããŸãã åäžã®å€ã§ãã£ãŠããå€ãè§£æŸããã¹ããªãŒã ãšããã«ç¶ãå®äºã€ãã³ããšããŠè¡šçŸã§ããŸãã
ç¡éã¹ããªãŒã -ã€ãã³ããçºè¡ããŸãããåäžã®ç«¯æ«ã€ãã³ãã¯ãããŸããïŒå®äº|ãšã©ãŒïŒã
RxJavaã¯ãã¿ã€ãã®ã¹ããªãŒã ã€ãã³ããã¢ãã«åããããã®ObservableããŒã¿ã¿ã€ããå®çŸ©ããŸãã Spring Reactorã§ã¯ã Fluxã¿ã€ããšåçã§ãã
芳枬å¯èœãªã®ã¯ãããŸããŸãªééã§åãããæž©åºŠã®æµãã§ãã
Observableã¯ãWebã¹ãã¢ãã賌å
¥ãã補åã®ã¹ããªãŒã ã§ãã
Observableã¯ãããŒã¿ããŒã¹ãžã®èŠæ±ã«å¿ããŠè¿ããã1人ã®ãŠãŒã¶ãŒïŒãŠãŒã¶ãŒïŒã衚ããŸãã
public Observable<User> findByUserId(String userId) {...}
ãã ãã Observableã¯åãªãããŒã¿åã§ããããããããªãã·ã¥/ãµãã¹ã¯ã©ã€ããŒã®ãã¶ã€ã³ãã¿ãŒã³ãšåæ§ã«ã3çš®é¡ã®ã€ãã³ããåŠçãããµãã¹ã¯ã©ã€ããŒïŒãµãã¹ã¯ã©ã€ããŒïŒãå¿
èŠã§ãã
Observable<CartItem> cartItemsStream = ...; Subscriber<CartItem> subscriber = new Subscriber<CartItem>() { @Override public void onNext(CartItem cartItem) { System.out.println("Cart Item added " + cartItem); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { e.printStackTrace(); } }; cartItemsStream.subscribe(subscriber);
ãªã¢ã¯ãã£ããªãã¬ãŒã¿ãŒ
ãã ããããã¯Stream-aã®äžéšã«éããããããŸã§ã®ãšãããåŸæ¥ã®Observerãã¶ã€ã³ãã¿ãŒã³ã®ã¿ã䜿çšããŠãç°åžžãªãã®ã¯äœ¿çšããŠããŸããã
ãªã¢ã¯ãã£ãéšåã¯ãã¹ããªãŒã ãã€ãã³ããçºçããããšãã«å®è¡ãããããã€ãã®é¢æ°ïŒæŒç®å-颿°ïŒãå®çŸ©ã§ããããšãæå³ããŸãã
ããã¯ãå¥ã®ã¹ããªãŒã ïŒ äžå€ã®ã¹ããªãŒã ïŒãäœæãããå¥ã®ã¹ããªãŒã ãäœæã§ããããšãæå³ããŸãã
Observable<CartItem> filteredCartStream = cartStream.filter(new Func1<CartItem, Boolean>() { @Override public Boolean call(CartItem cartItem) { return cartItem.isLaptop(); } }); Observable<Long> laptopCartItemsPriceStream = filteredCartStream.map(new Func1<CartItem, Long>() { @Override public Long call(CartItem cartItem) { try { return priceService.getPrice(cartItem.getId()); } catch(PriceServiceException e) { thrown new RuntimeException(e); } } });
Observableã¯ã©ã¹ïŒfilterãmapãgroupByã...ïŒã®æŒç®åïŒã¡ãœããïŒã¯Observableãè¿ããããæŒç®åã®ãã§ãŒã³ã䜿çšããŠã©ã ãæ§æãšçµã¿åãããŠçŸãããã®ãæžãããšãã§ããŸãã
Observable<BigDecimal> priceStream = cartStream .filter((cartItem) -> cartItem.isLaptop()). .map((laptop) -> { try { return priceService.getPrice(cartItem.getId()); } catch(PriceServiceException e) { thrown new RuntimeException(e); } });
äžèšã§ã¯ã priceStream
ãäœæãããŠãäœãèµ·ãããªãããšã«æ³šæããŠãã ããpriceService.getPrice()
ãã¹ããŒãã¡ã³ãã®ãã§ãŒã³ãééããèŠçŽ ããããŸã§åŒã³åºãããŸããã ããã¯ãrxæŒç®åã䜿çšããŠèšç»ã®å€èгãäœæããããšãæå³ããŸãã管çãããããŒã¿ããã§ãŒã³ãäžãæ¹æ³ïŒçœ²åãèšé²ãããŸãïŒã
ãªã¢ã¯ãã£ãããã°ã©ãã³ã°ã®èª¬æãæ±ãããããšãã圌ãã¯éåžžãExcelã·ãŒãã§åè«ããããŠäŸã瀺ããŸããããã§ã¯ãã»ã«ãæŽæ°ããããšãã«åŒã³åºãããæ°åŒãåã«æžã蟌ãŸãããããé çªã«å¥ã®ã»ã«ãæŽæ°ãããã®ã»ã«ããã§ãŒã³å
ã®å¥ã®ã»ã«ãæŽæ°ããŸãã
äœãããªãrx-operatorã®ããã«ããããã®åŒã¯åã«ããŒã¿ãå¶åŸ¡ããã ãã§ããããããããããŒã¿ãé£éãããŸã§äœãããããã£ã³ã¹ãåŸãŸãã
ã€ãã³ãããªãã¬ãŒã¿ãŒã®ãã§ãŒã³ãšãšãã«ã©ã®ããã«ç§»åããããããããçè§£ããããã«ãããå®¶ããå¥ã®å®¶ã«ç§»åããäŸã§ã¯ãã ãŒããŒã¯ãªãã¬ãŒã¿ãŒãšããŠæ©èœããããªãã®å®¶ããã®ãã®ãç§»åãã-Thomas Nildãèšã£ãããã«ã䟿å©ãªã¢ãããžãŒãèŠã€ããŸããã
圌ã®ãµã³ãã«ã³ãŒãã¯æ¬¡ã®ãšããã§ãã
Observable<Item> mover1 = Observable.create(s -> { while (house.hasItems()) { s.onNext(house.getItem()); } s.onCompleted(); }); Observable<Item> mover2 = mover1.map(item -> putInBox(item)); Subscription mover3 = mover2.subscribe(box -> putInTruck(box), () -> closeTruck());

ãããŒããŒ1ã¯ã Observable
ãœãŒã¹ã®äžæ¹ã§ããå®¶ããç©ãåãåºãããšã§ç°åžžå€ãäœæããŸãonNext()
map()
æäœãå®è¡ããonNext()
ã¡ãœããã§ããŒããŒ2ãåŒã³åºããŸãonNext()
ã¡ãœãããåŒã³åºããããšããããŠããããããã¯ã¹ã«è»¢éããŸããããããããã·ã³ã«ããã¯ã¹ãããŒãããonNext()
ã¡ãœããã䜿çšããŠãæåŸã®Subscriber
ïŒãµãã¹ã¯ã©ã€ããŒïŒã§ããããŒããŒ3ãåŒã³åºããŸãã
RxJavaã®éæ³ã¯å©çšå¯èœãªæŒç®åã®å€§ããªã»ããã§ãããããªãã®ä»äºã¯ãããããã¹ãŠçµã¿åãããŠããŒã¿ã®æµããå¶åŸ¡ããããšã§ãã

å€ãã®Streamãªãã¬ãŒã¿ãŒã¯ãRââeactiveXãã¬ãŒã ã¯ãŒã¯ïŒReactive ExtensionsïŒã®äžããäžè¬çãªèšèªïŒRxJavaãRxJSãRx.NETãªã©ïŒã§å®è£
ã§ããã¹ããªãŒã ã§å®è¡ãããã¢ã¯ã·ã§ã³ã瀺ãçšèªéãäœæããã®ã«åœ¹ç«ã¡ãŸãã
Spring Reactorãªã©ã®ãªã¢ã¯ãã£ãã¹ããªãŒã ãæäœããããã«ããŸããŸãªãã¬ãŒã ã¯ãŒã¯ã䜿çšããå Žåã§ãããããã®æŠå¿µãç¥ã£ãŠããå¿
èŠããããŸãïŒãããã®ãã¬ãŒã ã¯ãŒã¯ã«å
±éã®æŒç®åãããããšãæåŸ
ããŠïŒã
ãããŸã§ããã£ã«ã¿ãªã³ã°ãªã©ã®åçŽãªæŒç®åã®ã¿ãèŠãŠããŸããã

ãã£ã«ã¿ãŒæ¡ä»¶ã«è©²åœããèŠçŽ ã®ã¿ãã¹ãããããŸãïŒ1ã€ã®ããŒããŒã¯ããã¹ãŠãããã«å¥ã®ããŒããŒã«è»¢éããã®ã§ã¯ãªãã100ãã«æªæºã®ãã®ã転éããŸãïŒ
ãã ããã¹ããªãŒã ãå€ãã®åå¥ã®ã¹ããªãŒã ã«åå²ã§ããæŒç®åããããŸãgroupBy
Observable<Observable<T>>
ïŒã¹ããªãŒã ã¹ããªãŒã ïŒ-ãããã¯groupBy
ãªã©ã®æŒç®ågroupBy

Observable<Integer> values = Observable.just(1,4,5,7,8,9,10); Observable<GroupedObservable<String, Integer>> oddEvenStream = values.groupBy((number) -> number % 2 == 0 ? "odd":"even"); Observable<Integer> remergedStream = Observable.concat(oddEvenStream); remergedStream.subscribe(number -> System.out.print(number +" "));
ããªãåçŽãªconcat
æŒç®åãå¶æ°ããã³å¥æ°ã®ã¹ããªãŒã ããåäžã®ã¹ããªãŒã ãäœæãããã®ãµãã¹ã¯ãªãã·ã§ã³ãã»ããã¢ããããŸãã

concat
ã¯ãã¹ããªãŒã ãå®äºããã®ãåŸ
ã£ãŠããå¥ã®ã¹ããªãŒã ã远å ããåã³1ã€ã®ã¹ããªãŒã ãäœæããããšãããããŸãã ãããã£ãŠã奿°ãæåã«è¡šç€ºãããŸãã
ãŸããããšãã°zip
æŒç®åãè¡ãããã«ãå€ãã®ã¹ããªãŒã ãçµåããæ©èœããããŸã

Zip
ã¯ãã¢ãŒã«ã€ããšããŠæ©èœãããããããã»ã©åä»ããããŠããŸãããããžãããŒã®ããã«ïŒãžã£ã±ããã«ïŒã2ã€ã®ã¹ããªãŒã ããã®ã€ãã³ããçµåããããã§ãã

1ã€ã®ã¹ããªãŒã ãã1ã€ã®ã€ãã³ããååŸããå¥ã®ã¹ããªãŒã ããã€ãã³ãã«æ¥ç¶ããŸãïŒãã¢ãäœæããŸãïŒã ãããå®äºãããããã§ãŒã³ãäžã«ç§»åããåã«æ¥çãªãã¬ãŒã¿ãé©çšããŸãã
PSïŒããã¯ããå€ãã®ã¹ããªãŒã ã§ãæ©èœããŸãã
ãã®ããã1ã€ã®ã¹ããªãŒã ãã€ãã³ããããéãçºè¡ãããšããŠãããªã¹ããŒã«ã¯ãããé
ãã¹ããªãŒã ãããªãªãŒã¹ãããè€åã€ãã³ãã®ã¿ã衚瀺ãããŸãã
ã¹ããªãŒã ããåä¿¡ããå€ãã®ãªã¢ãŒãåŒã³åºãããã®å¿çããåŸ
æ©ãããæ©èœãæã€ããšã¯ãå®éã«ã¯éåžžã«äŸ¿å©ã§ãã
äžæ¹ã combineLatest
ãªãã¬ãŒã¿ãŒcombineLatest
ãã€ãã³ãã®ãã¢ã®ãªãªãŒã¹ãcombineLatest
ããã代ããã«ãããé
ãã¹ããªãŒã ããçºè¡ãããææ°ã®ã€ãã³ãã䜿çšããŠãããããããã§ãŒã³ã®ããã«äžæµã«æ¥çããã³è»¢éããæ©èœãé©çšããŸãã

ããã·ã¥ã¢ãââããŒãã«åºã¥ãæèã«åãã£ãŠããŸã
Observable
ãå®éã«äœæãããæ¹æ³ã®äŸãããã€ãèŠãŠã¿ãŸãããã æé·ã®äœæãªãã·ã§ã³ïŒ
log("Before create Observable"); Observable<Integer> someIntStream = Observable .create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { log("Create"); emitter.onNext(3); emitter.onNext(4); emitter.onNext(5); emitter.onComplete(); log("Completed"); } }); log("After create Observable"); log("Subscribing 1st"); someIntStream.subscribe((val) -> LOGGER.info("received " + val)); // // (for onError and onComplete) , - log("Subscribing 2nd"); someIntStream.subscribe((val) -> LOGGER.info("received " + val));
ãµãã¹ã¯ã©ã€ããŒã賌èªãããšããã«ã€ãã³ãããµãã¹ã¯ã©ã€ããŒã«éä¿¡ãããŸã ã
ãã®èšèšã䜿çšããŠããããã§ã¯ãªããæ°ããObservableOnSubscribe
ãªããžã§ã¯ããæž¡ããŸãããããã¯ã誰ãããµãã¹ã¯ã©ã€ããããšãã«äœããã¹ããã瀺ããŠããŸãã
Observable
ã«ãµãã¹ã¯ã©ã€ããããŸã§ãåºåã¯ãªããäœãèµ·ãããŸãããããŒã¿ã¯ç§»åããŸããã
誰ãããµã€ã³ã¢ãããããšã call()
ã¡ãœãããcall()
ã3ã€ã®ã¡ãã»ãŒãžããã§ãŒã³ã«ããã·ã¥ããŠã³ããããã®åŸã«ã¹ããªãŒã ãçµäºãããšããã·ã°ãã«ãç¶ããŸãã
2åãµã€ã³ã¢ããããcall(...)
ã call(...)
ã¡ãœããå
ã®ã³ãŒãã2ååŒã³åºãããŸãã ãããã£ãŠãä»ã®èª°ãããµãã¹ã¯ã©ã€ãããåºåçšã«æ¬¡ã®å€ãåãåããšããã«ãåãå€ã广çã«è»¢éããŸãã
mainThread: Before create Observable mainThread: After create Observable mainThread: Subscribing 1st mainThread: Create mainThread: received 3 mainThread: received 4 mainThread: received 5 mainThread: Completed mainThread: Subscribing 2nd mainThread: Create mainThread: received 3 mainThread: received 4 mainThread: received 5 mainThread: Completed
rxæŒç®åã¯å¿
ããããã«ãã¹ã¬ãããæå³ããããã§ã¯ãªãããšã«æ³šæããããšãéèŠã§ãã RxJavaã¯ObservableãšSubscriberã®éã®ããã©ã«ãã®ç«¶åãå®è£
ããŸããã ãããã£ãŠããã¹ãŠã®åŒã³åºãã¯ã ã¡ã€ã³ ãã¹ã¬ããã§çºçããŸãã
誰ãããµã€ã³ã¢ããããããšãã«åºããå§ããObservable
ã®ã¿ã€ãã¯ã cold observables
ãšåŒã°ããŸãã å¥ã®ãã¥ãŒã¯hot observables
ã誰ããã©ããŒããŠããªãå Žåã§ãã€ãã³ããçºè¡ã§ããŸãã
Cold Observables
ã誰ãããµã€ã³ã¢ãããããšãã«Cold Observables
ã€ãã³ãã®é
ä¿¡ãéå§ããŸãã åãµãã¹ã¯ã©ã€ããŒã¯åãã€ãã³ããåãåããŸãã ããšãã°ããã¬ãŒã€ãŒã§CDããªã³ã«ããŠèŽãã人ãåãæ²ãåçããCDãšã㊠ãHot Observables
ã€ãã³ãã¯ããŸã 誰ã賌èªããŠããªãå Žåã§ãé
ä¿¡ãããŸãã 誰ããªã³ã«ããŠããªãå Žåã§ããæŸéãéããŠæãåçããã©ãžãªå±ã®ããã« ã ã©ãžãªããªã³ã«ãããšããšåæ§ã«ãéå»ã®ã€ãã³ããã¹ãããããŸãã ååžãå¶åŸ¡ã§ããªãããããªãã¶ãŒããã«ã¢ãã«ã€ãã³ãã ã€ãã³ãããã°ïŒã€ãã³ããã°ïŒã«æžã蟌ãå Žåãšã»ãŒåãã§ãã
Subjects
ã¯ãã®ãããªç¹å¥ãªçš®é¡ã®Observable
ã§ããã Observer
ïŒ Subscriber
ããã«ïŒ onNext()
åŒã³åºããŠonNext()
ããŒã¿ãããã·ã¥ã§ããããšã決å®ããïŒ Observer
ã§ãããããããObservables
å®è£
ã容æã«ããŸãã ReplaySubject
ãªã©ã®å€ãã®å®è£
ããããéžæããã€ãã³ãããããã¡ãŒã«ä¿åããŠãµãã¹ã¯ãªãã·ã§ã³ã§åçããŸãïŒãã¡ããã OutOfMemory
ãšã©ãŒãé²ãããã«ãããã¡ãŒã®ãµã€ãºãæå®ã§ããŸãïŒãã PublishSubject
ã¯çœ²ååŸã«çºçããã€ãã³ãã®ã¿ãã¹ãããããŸãã
ãããŠãã¡ãããä»ã®ãœãŒã¹ããObservables
ãäœæããããã®å€ãã®éçã¡ãœããããããŸãã
Observable.just("This", "is", "something") Observable.from(Iterable<T> collection) Observable.from(Future<T> future) - , `future`
ããã·ã¥ã§éä¿¡ãããããŒã¿ã®ELKã¹ã¿ãã¯RabbitMQãšããã¿ãŒãžã®è¿œå
äŒçµ±çã«ãELKã¹ã¿ãã¯ã䜿çšããå ŽåãElasticSearchã䜿çšããŠã€ãã³ããã°ããããŒã¿ãèŠæ±ããããããã«ããŒã¹ã®ããŒãªã³ã°ã¹ã¿ã€ã«ã§ãããšèšããŸãã
代ããã«ãã€ãã³ããçºçããç¬éããåŠçãéå§ãããŸã§ã®ã€ãã³ããžã®å¿çæéãããã«ççž®ããããã«ãã€ãã³ãããžã£ãŒãã«ã«è¡šç€ºããããšãã«ã峿ãã«éç¥ããããã·ã¥ããŒã¹ã䜿çšã§ããŸããïŒ
倿°ã®å¯èœãªãœãªã¥ãŒã·ã§ã³ã®1ã€ã¯RabbitMqã§ããããã¯ãèšå€§ãªæ°ã®ã¡ãã»ãŒãžãåŠçã§ããããšãããããã©ãŒãã³ã¹ã§éåžžã«é«ãè©äŸ¡ãåŸãŠããæŠéã§ã®çµéšè±å¯ãªãœãªã¥ãŒã·ã§ã³ã§ãã ããã«ããããããã Logstashã¯æ¢ã«RabbitMQãã©ã°ã€ã³ããµããŒãããŠããŸãïŒ FluentDã«ã¯å¥ã®ãã©ã°ã€ã³ããããŸãïŒããããæ¢åã®ELKã¹ã¿ãã¯ã«ç°¡åã«çµ±åããElasticSearchãšRabbitMQã«ãã°ãæžã蟌ãããšãã§ããŸãã
ããããã Logstashã¯ã³ã³ãããŒã©ãŒã®ããã«æ¯ãèãããšãã§ãããããã©ã®ããã«æ©èœãããããã°ã«èšé²ãããã€ãã³ããéä¿¡/ä¿åããå Žæãéžæã§ããããšãèŠããŠããã§ãããã ããã¯ãåŠçãããã€ãã³ããé€å€ããããä»ã®RabbitMQãã¥ãŒãªã©ãžã®éä¿¡å
ã瀺ãããã§ããããšãæå³ããŸãã
Logstashã®äœ¿çšãçç¥ãããå Žåã¯ãLogback Appenderãä»ããŠRabbitMQã«ããŒã¿ãçŽæ¥éä¿¡ãããªãã·ã§ã³ããããŸãã
ãšããã§ïŒããããAmqpAppender
ã¯ã RabbitMQ AMQPã®ç¹å®ã®å®è£
ã§ãïŒãããã³ã«ããŒãžã§ã³AMQP 0-9-1ã0-9ïŒã
ããšãã°ãActiveMQïŒAMQPã³ãã¯ã¿ããµããŒãããŠããïŒã¯ãããã³ã«ããŒãžã§ã³AMQP 1.0ãå®è£
ããŠããããã§ãããspring-amqpã©ã€ãã©ãªã«ã¯ãããã³ã«ããŒãžã§ã³0-9-1ã0-9ãããã1.0ãšã¯ãŸã£ããç°ãªããŸãïŒã¿ã€ã'org.apache.activemq.transport.amqp.AmqpProtocolException: Connection from client using unsupported AMQP attempted'
ãã ããç§ãã¡ã®ãœãªã¥ãŒã·ã§ã³ã¯ã logstash-logback-encoderã䜿çšãããã©ãŒããããããJSONãã€ãã³ããã°ãšãšãã«Logstashã«éä¿¡ããããšã§ãã ã logstashã®åºåã亀æãã€ã³ãRabbitMQïŒäº€æïŒã«ãªãã€ã¬ã¯ãããŸãã
docker- composeã䜿çšããŠlogstash-rabbitmqã¯ã©ã¹ã¿ãŒãéå§ããŸã
ãªããžããªãè€è£œã§ããŸã
docker-compose -f docker-compose-rabbitmq.yml up
ãããŠãããªãã¯äœ¿çšããããšãã§ããŸã
./event-generate.sh
logstashã«éä¿¡ããã倿°ã®ã©ã³ãã ã€ãã³ããçæããŸã ã
, , , logstash . rabbitmq-output-plugin , :
output { rabbitmq { exchange => logstash exchange_type => direct host => rabbitmq key => my_app } }
RabbitMQ JMS , AMQP , .

(exchange) .
'routing-key', , . , ' logstash. ''
AMQP
. Spring
c RabbitMq
@Bean ConnectionFactory connectionFactory() { return new CachingConnectionFactory(host, port); } @Bean RabbitAdmin rabbitAdmin() { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory()); rabbitAdmin.declareQueue(queue()); rabbitAdmin.declareBinding(bindQueueFromExchange(queue(), exchange())); return rabbitAdmin; } @Bean SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(queueName); container.setMessageListener(listenerAdapter); return container; } @Bean Queue queue() { return new Queue(queueName, false); } DirectExchange exchange() { return new DirectExchange("logstash"); } private Binding bindQueueFromExchange(Queue queue, DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("my_app"); } @Bean MessageListenerAdapter listenerAdapter(Receiver receiver) { MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver, new MessageConverter() { public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException { throw new RuntimeException("Unsupported"); } public String fromMessage(Message message) throws MessageConversionException { try { return new String(message.getBody(), "UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException("UnsupportedEncodingException"); } } }); messageListenerAdapter.setDefaultListenerMethod("receive");
'logstash', 'my_app'. MessageListenerAdapter , 'receive' Receiver
, .
, , hot observable
, , , PublishSubject .
public class Receiver { private PublishSubject<JsonObject> publishSubject = PublishSubject.create(); public Receiver() { } public void receive(Object message) { log.info("Received remote message {}", message); JsonElement remoteJsonElement = gson.fromJson ((String) message, JsonElement.class); JsonObject jsonObj = remoteJsonElement.getAsJsonObject(); publishSubject.onNext(jsonObj); } public PublishSubject<JsonObject> getPublishSubject() { return publishSubject; } }
, SimpleMessageListenerContainer , ( ). Observable , ( onNext
, onComplete
, onError ):
// Observable.create(s -> { // Thread A new Thread(() -> { s.onNext("one"); s.onNext("two"); }).start(); // Thread B new Thread(() -> { s.onNext("three"); s.onNext("four"); }).start(); }); // // Observable<String> obs1 = Observable.create(s -> { // Thread A new Thread(() -> { s.onNext("one"); s.onNext("two"); }).start(); }); Observable<String> obs2 = Observable.create(s -> { // Thread B new Thread(() -> { s.onNext("three"); s.onNext("four"); }).start(); }); Observable<String> c = Observable.merge(obs1, obs2);
Observable.serialize()
Subject.toSerialized()
, 1 Thread
ListenerContainer
, . , Subjects
, . .
ãã®é·ãããŒãIIæçš¿ïŒããŒã2ïŒã®ç¶ããšããŠã³ãŒããšãªããžããªã確èªããããRx Playgroundã«ã¢ã¯ã»ã¹ããŠãããã«å€ãã®äŸãèŠã€ããããšãã§ããŸãã翻蚳è
ã®ãµã€ããžã®ãªã³ã¯