RxJavaãæ±ãããšã«æ
£ããŠããªããããããçè§£ããããšããŠãããããŸã å®äºããŠããªãå Žåã¯ã以äžã«æ°ãããã®ããããŸãã
å
ã®èšäºã¯2017幎11æ29æ¥ã«äœæãããŸããã翻蚳ã¯ç¡æã§ããGO-JEKã§ã¯ãã¢ããªã±ãŒã·ã§ã³ã§å€æ°ã®éåææäœãå®è¡ããå¿
èŠããããŸããããŠãŒã¶ãŒã€ã³ã¿ãŒãã§ã€ã¹ã®éåºŠãšæ»ããããç ç²ã«ããããšã¯ã§ããŸããã
è€éãªãã«ãã¹ã¬ããAndroidã¢ããªã±ãŒã·ã§ã³ã®äœæã¯ãéåžžã«æéã®ãããããã»ã¹ã«ãªãå¯èœæ§ããããŸããçžäºã«é¢é£ãã倿°ã®ããšãåŠçããå¿
èŠãããããããšãã©ãããã¯ããªããå§åããŸãã ãããšä»ã®å€ãã®çç±ã«ãããéçºäžã®Androidã¢ããªã±ãŒã·ã§ã³ã§RxJavaã䜿çšããããã«ãªããŸããã
ãã®èšäºã§ã¯ãRxJavaã®å®éã®ãã«ãã¹ã¬ããæ©èœã䜿çšããŠãã¢ããªã±ãŒã·ã§ã³éçºããã»ã¹ãå¯èœãªéãã·ã³ãã«ãç°¡åããããŠæ¥œãããã®ã«ããæ¹æ³ã«ã€ããŠèª¬æããŸãã 以äžã®ãã¹ãŠã®ã³ãŒãäŸã§ã¯ãRxJava 2ã䜿çšãããŸããã説æãããŠããæŠå¿µã¯ä»ã®
ãªã¢ã¯ãã£ããšã¯ã¹ãã³ã·ã§ã³ã«é©çšã§ããŸãã
ãªããªã¢ã¯ãã£ãããã°ã©ãã³ã°ãªã®ãïŒ
ãªã¢ã¯ãã£ãããã°ã©ãã³ã°ã«é¢ããåèšäºã¯ããã®ãããªçŸ©åçãªãããã¯ããå§ãŸãããã®äŒçµ±ãç ŽããŸããã ãªã¢ã¯ãã£ãã¢ãããŒãã䜿çšããŠAndroidã¢ããªã±ãŒã·ã§ã³ãæ§ç¯ããããšã«ã¯ããã€ãã®å©ç¹ããããŸãã
æ¬åœã«å¿
èŠãªãã®ã«æ³šç®ããŸãããã
ã³ãŒã«ããã¯ã¯ãããããŸãã
é·ãéAndroidåãã«éçºããŠããå Žåã¯ããã¹ããããã³ãŒã«ããã¯ã䜿çšãããšãäºæ
ãéåžžã«è€éã«ãªããå¶åŸ¡äžèœã«ãªãã®ã«æ°ä»ããã¯ãã§ãã
ããã¯ãè€æ°ã®éåææäœãé£ç¶ããŠå®è¡ããåã®æäœã®çµæã«å¿ããŠããã«ã¢ã¯ã·ã§ã³ãå®è¡ããå Žåã«çºçããŸãã ããã«ãã³ãŒãããªãŒããŒããŒãã«ãªãè€éã«ãªãããµããŒãã§ããªããªããŸãã
ã·ã³ãã«ãªãšã©ãŒå¶åŸ¡
åœä»€çãªäžçã§ã¯ãå€ãã®è€éãªéåææäœãå®è¡ãããç¶æ³ã§ã¯ããšã©ãŒã倿°ã®å Žæã§çºçããå¯èœæ§ããããŸãã ãããŠããããã®ãšã©ãŒãåŠçããå¿
èŠããããã¹ãŠã®å Žæã§ãçµæãšããŠãå€ãã®ç¹°ãè¿ããã³ãã¬ãŒãã³ãŒãã衚瀺ãããã¡ãœãããé¢åã«ãªããŸãã
ãã«ãã¹ã¬ããã®éåžžã«ç°¡åãªäœ¿çš
ç§ãã¡ã¯çãJavaãã«ãã¹ã¬ãããæã
è€éã«ãªãããšãç¥ã£ãŠããŸãïŒãããŠå¯ãã«èªããŸãïŒã ããšãã°ãããã¯ã°ã©ãŠã³ãã¹ã¬ããã§ã³ãŒããå®è¡ããçµæãã¡ã€ã³ã¹ã¬ããã«è¿ããŸãã åçŽã«èãããŸãããå®éã«ã¯åé¿ããå¿
èŠãããå€ãã®èœãšã穎ããããŸãã
RxJavaã䜿çšãããš ãéžæ
ããä»»æã®ã¹ã¬ããã§ããã€ãã®è€éãªæäœãéåžžã«ç°¡åã«å®è¡ã§ããæ£ããåæã管çããåé¡ãªãã¹ã¬ãããåãæ¿ããããšãã§ããŸãã
RxJavaã®å©ç¹ã¯ç¡éã§ãã ç§ãã¡ã¯äœæéãããã«ã€ããŠè©±ãããšãã§ããããªããæ©ãŸããããšã¯ã§ããŸãããã代ããã«ãRxJavaã§ãã«ãã¹ã¬ããã®å®éã®ä»äºãããæ·±ãæãäžããŠåŠã³ãŸãããã
RxJavaã¯ããã©ã«ãã§ã¯ãã«ãã¹ã¬ããã§ã¯ãããŸãã
ã¯ããããªãã¯ãããæ£ããèªã¿ãŸããã ãšã«ãããRxJavaã¯ããã©ã«ãã§ã¯ãã«ãã¹ã¬ããã§ã¯ãããŸããã å
¬åŒWebãµã€ãã§RxJavaã«äžããããå®çŸ©ã¯æ¬¡ã®ããã«ãªããŸãã
ãä»®æ³Javaãã·ã³ã®ç£èŠå¯èœãªã·ãŒã±ã³ã¹ã䜿çšããŠãéåæããã³ã€ãã³ãããŒã¹ã®ããã°ã©ã ãã³ã³ãã€ã«ããããã®ã©ã€ãã©ãªãããéåæããšããèšèãèŠãŠãå€ãã®äººã¯RxJavaãããã©ã«ãã§ãã«ãã¹ã¬ããã§ãããšèª€ã£ãŠä¿¡ããŠããŸãã ã¯ããRxJavaã¯ãã«ãã¹ã¬ããããµããŒãããéåææäœã§ç°¡åã«æäœã§ããå€ãã®åŒ·åãªæ©èœãæäŸããŸãããããã¯RxJavaã®ããã©ã«ãã®åäœããã«ãã¹ã¬ããã§ããããšãæå³ããŸããã
ãã§ã«RxJavaãå°ã䜿çšããããšãããå Žåã¯ããã®åºæ¬æ§æãç¥ã£ãŠããŸãã
- Observable source ïŒãœãŒã¹ObservableïŒ ãããã«
- ããã€ãã®æŒç®å
- ã¿ãŒã²ãããµãã¹ã¯ã©ã€ããŒïŒãµãã¹ã¯ã©ã€ããŒïŒ
Observable.just(1, 2, 3, 4, 5) .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { println("Emitting item on: " + currentThread().getName()); } }) .map(new Function<Integer, Integer>() { @Override public Integer apply(@NonNull Integer integer) throws Exception { println("Processing item on: " + currentThread().getName()); return integer * 2; } }) .subscribeWith(new DisposableObserver<Integer>() { @Override public void onNext(@NonNull Integer integer) { println("Consuming item on: " + currentThread().getName()); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } });
ãã®ãµã³ãã«ã³ãŒããå®è¡ãããšããã¹ãŠã®ã¢ã¯ã·ã§ã³ãã¢ããªã±ãŒã·ã§ã³ã®ã¡ã€ã³ã¹ã¬ããã§å®è¡ãããããšãæç¢ºã«ããããŸãïŒã³ã³ãœãŒã«ã®ãã°ã®ã¹ã¬ããåã«åŸã£ãŠãã ããïŒã ãã®äŸã¯
ãRxJavaã®ããã©ã«ãã®åäœãããããã³ã°ã§ããããšã瀺ããŠ
ããŸãã ãã¹ãŠã¯ãã³ãŒããåŒã³åºãããã®ãšåãã¹ã¬ããã§å®è¡ãããŸãã
ããŒãã¹ïŒ doOnNext()
ã¯äœãããã®ã§ããããïŒ ããã¯
å¯äœçšã¹ããŒãã¡ã³ãã«éããŸããã
observable
ãªããžã§ã¯ãã®ãã§ãŒã³ã«å
¥ããããŒãã£
ïŒäžçŽïŒæäœãå®è¡ããã®ã«åœ¹ç«ã¡ãŸãã ããšãã°ããããã°çšã®åŒã³åºããã§ãŒã³ã«è¿œå ã®ã³ãŒããåã蟌ã¿ãŸãã 詳现ã¯
ãã¡ã ã
ç°¡åãªäŸ
RxJavaã䜿çšããŠãã«ãã¹ã¬ããã®æäœãéå§ããã«ã¯ã
Schedulers ã
observeOn / subscribeOnãªã©ã®åºæ¬ã¯ã©ã¹ãšã¡ãœããã«ç²Ÿéããå¿
èŠããã
ãŸã ã
æãåçŽãªäŸã®1ã€ãèŠãŠã¿ãŸãããã ãããã¯ãŒã¯èŠæ±ã§
Book
ãªããžã§ã¯ãã®ãªã¹ããååŸããã¡ã€ã³ã¢ããªã±ãŒã·ã§ã³ã¹ã¬ããã«è¡šç€ºãããšããŸãã ãŸãã¯ããªãäžè¬çã§çè§£ããããäŸã§ãã
getBooks().subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<Book>() { @Override public void onNext(@NonNull Book book) {
ããã§ã¯ããããã¯ãŒã¯åŒã³åºããè¡ããæžç±ã®ãªã¹ããåéãã
getBooks()
ã¡ãœããã確èªããŸãã ãããã¯ãŒã¯åŒã³åºãã«ã¯æéããããïŒæ°ããªç§ãŸãã¯æ°ç§ïŒã®ã§ã
subscribeOn()
ã䜿çšãã
Schedulers.io()
ã¹ã±ãžã¥ãŒã©ãŒãæå®ããŠãI / Oã¹ããªãŒã ã§æäœãå®è¡ããŸãã
ãŸããã¡ã€ã³ã¹ã¬ããã§çµæãåŠçããã¢ããªã±ãŒã·ã§ã³ã®ãŠãŒã¶ãŒã€ã³ã¿ãŒãã§ã€ã¹ã§æžç±ã®ãªã¹ãã衚瀺ããããã«ã
observeOn()
æŒç®åãš
AndroidSchedulers.mainThread()
ã¹ã±ãžã¥ãŒã©ãŒã䜿çšããŸãã
å¿é
ããå¿
èŠã¯ãããŸãããããã«é«åºŠãªãã®ã«ç§»ããŸãã ãã®äŸã¯ãæ·±ãæãäžããåã«åºæ¬çãªæŠå¿µãæãåºãããšã®ã¿ãç®çãšããŠããŸããã
ã¹ã±ãžã¥ãŒã©ã§åéãäœã
RxJavaã¯ã匷åãªã¹ã±ãžã¥ãŒã©ãŒã»ãããæäŸããŸãã ã¹ããªãŒã ã«çŽæ¥ã¢ã¯ã»ã¹ãŸãã¯ç®¡çããããšã¯ã§ããŸããã ã¹ã¬ããã䜿çšããå¿
èŠãããå Žåã¯ãçµã¿èŸŒã¿ã¹ã±ãžã¥ãŒã©ã䜿çšããå¿
èŠããããŸãã
ã¹ã±ãžã¥ãŒã©ã¯
ãããããçš®é¡ã®ã¿ã¹ã¯ã®ã¹ã¬ãããŸãã¯ã¹ã¬ããããŒã«ïŒã¹ã¬ããã®ã³ã¬ã¯ã·ã§ã³ïŒãšèããããšãã§ããŸãã
ç°¡åã«èšãã°ãå¥ã®ã¹ã¬ããã§ã¿ã¹ã¯ãå®äºããå¿
èŠãããå Žåãå©çšå¯èœãªã¹ã¬ããã®ããŒã«ããã¹ã¬ãããååŸãããã®ã¿ã¹ã¯ãå®äºããå¿ å®ãªã¹ã±ãžã¥ãŒã©ãŒã䜿çšããå¿
èŠããããŸãã
RxJavaã«ã¯ãããã€ãã®çš®é¡ã®ã¹ã±ãžã¥ãŒã©ããããŸãã æãé£ããã®ã¯ãã¿ã¹ã¯ã«é©ããã¹ã±ãžã¥ãŒã©ãéžæããããšã§ãã é©åãªã¹ã±ãžã¥ãŒã©ãéžæããªãéããã¿ã¹ã¯ãæé©ã«å®è¡ãããããšã¯ãããŸããã åãã©ã³ããŒãèŠãŠã¿ãŸãããã
Schedulers.ioïŒïŒ
ãã®ã¹ã±ãžã¥ãŒã©ã¯
ãç¡å¶éã®ã¹ã¬ããããŒã«ã«åºã¥ããŠãã ããã¡ã€ã«ã·ã¹ãã ãžã®ã¢ã¯ã»ã¹ããããã¯ãŒã¯åŒã³åºããããŒã¿ããŒã¹ãžã®ã¢ã¯ã»ã¹ãªã©ã
CPUã䜿çšããã«I / Oãéäžçã«äœ¿çšããããã«äœ¿çšãããŸãã ãã®ã¹ã±ãžã¥ãŒã©ã®ã¹ã¬ããã®æ°ã¯ç¡å¶éã§ãããå¿
èŠã«å¿ããŠå¢ããããšãã§ããŸãã
Schedulers.computationïŒïŒ
ãã®ã¹ã±ãžã¥ãŒã©ã¯ã倧éã®ããŒã¿ãç»åãªã©ãåŠçãããªã©ã
CPUãéäžçã«
䜿çšããäœæ¥ãå®è¡ããããã«äœ¿çšãããŸã ã ã¹ã±ãžã¥ãŒã©ã¯
ãå©çšå¯èœãªããã»ããµã®æ°ã®ãµã€ãºãæã€ã¹ã¬ããã®éå®ãããããŒã«ã«åºã¥ããŠããŸãããã®ã¹ã±ãžã¥ãŒã©ã¯CPUã®éäžçãªäœæ¥ã«ã®ã¿é©ããŠãããããã¹ã¬ããã®æ°ã¯å¶éãããŠããŸãã ããã¯ãã¹ã¬ãããããã»ããµãŒæéãšç«¶åãããã¢ã€ãã«ç¶æ
ã«ãªããªãããã«ããããã§ãã
Schedulers.newThreadïŒïŒ
ãã®ã¹ã±ãžã¥ãŒã©
ã¯ãåŒã³åºãããšã«å®å
šã«æ°ããã¹ã¬ãããäœæããŸãã ãã®å Žåãã¹ã¬ããããŒã«ã䜿çšããŠãã¡ãªããã¯ãããŸããã ã¹ããªãŒã ã®äœæãšç Žæ£ã¯éåžžã«é«äŸ¡ã§ãã ã¹ã¬ããã®éå°ãªäœæãä¹±çšããªãããã«æ³šæããå¿
èŠããããŸããããã«ãããã·ã¹ãã ã®é床äœäžãšã¡ã¢ãªãªãŒããŒãããŒãçºçããå¯èœæ§ããããŸãã
ç£èŠå¯èœãªãœãŒã¹ããåãåã£ãåã¢ã€ãã ãåŠçããããã®æ°ããã¹ã¬ãããäœæãããŸã ã
çæ³çã«ã¯ãäž»ã«ããã°ã©ã ã®å®è¡æéã®é·ãéšåãå¥ã®ã¹ããªãŒã ã«è¡šç€ºããããã«ããã®ã¹ã±ãžã¥ãŒã©ãã»ãšãã©äœ¿çšããªãã§ãã ããã
Schedulers.singleïŒïŒ
ãã®ã¹ã±ãžã¥ãŒã©ã¯
ãã¿ã¹ã¯ã®é 次å®è¡ã«äœ¿çšãããåäžã®ã¹ã¬ããã«åºã¥ããŠããŸãã ã¢ããªã±ãŒã·ã§ã³ã®ããŸããŸãªå Žæã«äžé£ã®ããã¯ã°ã©ãŠã³ãã¿ã¹ã¯ãããå Žåã«éåžžã«åœ¹ç«ã¡ãŸããããããã®ã¿ã¹ã¯ã®è€æ°ãåæã«å®è¡ããããšã¯ã§ããŸããã
Schedulers.fromïŒExecutorãšã°ãŒãã¥ãŒã¿ãŒïŒ
ãã®ã¹ã±ãžã¥ãŒã©ã¯ãç¬èªã®
Executor
åºã¥ããŠããŸãã ã¹ã¬ããå²ãåœãŠã®ç¬èªã®ããžãã¯ã«åºã¥ããŠãã¹ã±ãžã¥ãŒã©ã§ç¹å®ã®ã¿ã¹ã¯ãå®è¡ããå¿
èŠãããç¶æ³ãçºçããå ŽåããããŸãã
ã¢ããªã±ãŒã·ã§ã³ãè¡ãåæãããã¯ãŒã¯åŒã³åºãã®æ°ãå¶éãããšããŸãã å¶éããããµã€ãºã®ã¹ã¬ããããŒã«ïŒ
Scheduler.from(Executors.newFixedThreadPool(n))
ïŒã«åºã¥ããŠåäœããç¬èªã®ã¹ã±ãžã¥ãŒã©ãŒãäœæãããããã¯ãŒã¯ã³ãŒã«ã«é¢é£ãããã¹ãŠã®å Žæã§äœ¿çšã§ããŸãã
AndroidSchedulers.mainThreadïŒïŒ
ããã¯ãRxJavaã©ã€ãã©ãªã§ã¯å©çšã§ããªãç¹å¥ãªã¹ã±ãžã¥ãŒã©ã§ãã ãã®ã¹ã±ãžã¥ãŒã©ã«ã¢ã¯ã»ã¹ããã«ã¯ã
RxAndroidæ¡åŒµã©ã€ãã©ãªã䜿çšããå¿
èŠããããŸãã ãã®ã¹ã±ãžã¥ãŒã©
ã¯ãAndroidã¢ããªã±ãŒã·ã§ã³ã§ãŠãŒã¶ãŒã€ã³ã¿ãŒãã§ã€ã¹ã¹ã¬ããã§ã¢ã¯ã·ã§ã³ãå®è¡ããã®ã«åœ¹ç«ã¡ãŸã ã
ããã©ã«ãã§ã¯ããã®ã¹ã±ãžã¥ãŒã©ã¯ã¡ã€ã³ã¹ã¬ããã«é¢é£ä»ãããã
Looper
ãžã§ãããã¥ãŒã«å
¥ããŸããããªãŒããŒã©ã€ãããå¯èœæ§ããããŸãïŒ
AndroidSchedulers.from(Looper looper)
ã
æ³šïŒ Schedulers.io()
ãªã©ãç¡å¶éã®ã¹ã¬ããããŒã«ã«åºã¥ãã¹ã±ãžã¥ãŒã©ãŒã䜿çšããå Žåã¯æ³šæããŠãã ããã ã¹ã¬ããã®æ°ãç¡éã«å¢å ãããªã¹ã¯ã¯åžžã«ãããŸãã
subscribeOnïŒïŒããã³observeOnïŒïŒãçè§£ãã
ã¹ã±ãžã¥ãŒã©ãŒã®ã¿ã€ãã«ã€ããŠçè§£
ã§ããã®ã§ ã
subscribeOnïŒïŒããã³
observeOnïŒïŒã詳现ã«
èŠãŠã¿ãŸããã ã
RxJavaã§ãã«ãã¹ã¬ããã䜿çšããŠå°éçã«äœæ¥ããã«ã¯ãããã2ã€ã®æŒç®åãå¥ã
ã«ã©ã®ããã«æ©èœããããæ·±ãçè§£ããå¿
èŠããããŸãã
subscribeOnïŒïŒ
ç°¡åã«èšãã°ã
ãã®ã¹ããŒãã¡ã³ãã¯ã ãœãŒã¹observableãã©ã®ã¹ããªãŒã ã§elementsãéä¿¡ãããã瀺ããŠããŸãã
ããœãŒã¹ããšããèšèã®éèŠæ§ãçè§£ããå¿
èŠããã
ãŸã ã
observableã®ãã§ãŒã³ãããå ŽåããœãŒã¹
ïŒãœãŒã¹observableïŒã¯åžžã«ã«ãŒãèŠçŽ ããŸãã¯ã€ãã³ããçæããããã§ãŒã³ã®æäžéšã«ãªããŸãã
ãã§ã«èŠãããã«ã
subscribeOn()
䜿çšããªãå Žåããã¹ãŠã®ã€ãã³ãã¯ã³ãŒããåŒã³åºãããã¹ã¬ããïŒãã®å Žåã¯
main
ã¹ã¬ããïŒã§çºçããŸãã
subscribeOn()
ããã³
Schedulers.computation()
ã¹ã±ãžã¥ãŒã©ãŒã䜿çšããŠãã€ãã³ããèšç®ã¹ããªãŒã ã«ãªãã€ã¬ã¯ãããŸãããã æ¬¡ã®ã³ãŒãäŸãå®è¡ãããšãããŒã«ã§äœ¿çšå¯èœãªèšç®ã¹ã¬ããã®1ã€
RxComputThreadPool-1
ã§ã€ãã³ããçºçããããšãããããŸãã
ã³ãŒããåæžããããã«ã
onError()
ããã³
onComplete()
ãåå®çŸ©ããå¿
èŠããªãããããã¹ãŠã®
DisposableSubscriber
ã¡ãœãããå®å
šã«åå®çŸ©ããããšã¯ããŸããã
doOnNext()
ãšã©ã ãã䜿çšããŸãã
Observable.just(1, 2, 3, 4, 5, 6) .subscribeOn(Schedulers.computation()) .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
åŒã³åºããã§ãŒã³ã®ã©ãã§
subscribeOn()
ã䜿çšãããã¯éèŠã§ã¯ãããŸããã observable source
ïŒsource observableïŒã§ã®ã¿åäœããobservable sourceãã€ãã³ããéä¿¡ããã¹ããªãŒã ãå¶åŸ¡ããŸãã
次ã®äŸã§ã¯ãä»ã®ãªãã¶ãŒããã«ãªããžã§ã¯ãããªãã¶ãŒããã«ãœãŒã¹ã®åŸã«äœæããïŒ
map()
ããã³
filter()
ã¡ãœããã䜿çšïŒã
subscribeOn()
æŒç®åãã³ãŒã«ãã§ãŒã³ã®æåŸã«é
眮ãããŸãã ãããããã®ã³ãŒããå®è¡ãããšããã«ããã¹ãŠã®ã€ãã³ãã
subscribeOn()
æå®ãããã¹ããªãŒã ã§çºçããããšã«æ°ä»ãã§ãããã
observeOn()
ãåŒã³åºããã§ãŒã³ã«è¿œå ãããšãããã¯ããæç¢ºã«ãªããŸãã ãããŠã
subscribeOn()
ã
observeOn()
äžã«
observeOn()
ãäœæ¥ã®ããžãã¯ã¯å€ãããŸããã
subscribeOn()
ã¯ã芳枬å¯èœãªãœãŒã¹ã§ã®ã¿æ©èœããŸã
ã Observable.just(1, 2, 3, 4, 5, 6) .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .map(integer -> integer * 3) .filter(integer -> integer % 2 == 0) .subscribeOn(Schedulers.computation()) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
ãŸããåãã³ãŒã«ãã§ãŒã³ã§
subscribeOn()
è€æ°å䜿çšã§ããªãããšãçè§£ããããšãéèŠã§ãã ãã¡ãããããäžåºŠæžãããšã¯ã§ããŸããã倿Žã¯å¿
èŠãããŸããã 以äžã®äŸã§ã¯ã3ã€ã®ç°ãªãã¹ã±ãžã¥ãŒã©ãŒãé çªã«åŒã³åºããŠããŸãããèµ·åæã«ã©ã®ã¹ã±ãžã¥ãŒã©ãŒãæ©èœããããæšæž¬ã§ããŸããïŒ
Observable.just(1, 2, 3, 4, 5, 6) .subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.computation()) .subscribeOn(Schedulers.newThread()) .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
Schedulers.io()
ãšåçããå Žåãããªãã¯æ£ããã§ãïŒ äœåºŠãåŒã³åºããè¡ã£ãŠãã
ç£èŠå¯èœãªãœãŒã¹ã®åŸã«åŒã³åºãããæåã®subscribeOn()
ã®ã¿ãæ©èœããŸãã
ãã³ãããã®äž
æ€èšãããäŸã®ãã詳现ãªç ç©¶ã«ããå°ãæéãããã䟡å€ããããŸãã ãªã
Schedulers.io()
ã¹ã±ãžã¥ãŒã©ãŒã®ã¿ãæ©èœããã®ã§ããïŒ éåžžã
Schedulers.newThread()
ã¯ãã§ãŒã³ã®æåŸã«ããããã誰ããæ©èœãããšèããŠããŸãã
RxJavaã§ã¯ã
Observable
ã®ãã¹ãŠã®ã€ã³ã¹ã¿ã³ã¹ã®ã³ãŒã«ããã¯åŸã«ãµãã¹ã¯ãªãã·ã§ã³ãäœæãããããšãçè§£ããå¿
èŠããããŸãã 以äžã®ã³ãŒãã¯ãããçè§£ããã®ã«åœ¹ç«ã¡ãŸãã ããã¯ä»¥åã«ã¬ãã¥ãŒãããäŸã§ãããããè©³çŽ°ã«æãããŠããŸãã
Observable<Integer> o1 = Observable.just(1, 2, 3, 4, 5); Observable<Integer> o2 = o1.filter(integer -> integer % 2 == 0); Observable<Integer> o3 = o2.map(integer -> integer * 10); o3.subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
ãã¹ãŠãã©ã®ããã«æ©èœããããçè§£ããããã«ãäŸã®æåŸã®è¡ãããã¹ãŠãåæãå§ããŸãã ãã®äžã§ãã¿ãŒã²ãããµãã¹ã¯ã©ã€ããŒã¯ããªãã¶ãŒããã«ãªããžã§ã¯ã
o3
ã§
subscribe()
ã¡ãœãããåŒã³åºããŸãããã®ã¡ãœããã¯ã芪ãªãã¶ãŒããã«ãªããžã§ã¯ã
o2
æé»çã«
subscribe()
ãåŒã³åºããŸãã
o3
ãªããžã§ã¯ãã«ãã£ãŠæäŸããããªãã¶ãŒããŒã®å®è£
ã¯ãéä¿¡ãããæ°å€ã«10ãæããŸãã
ããã»ã¹ãç¹°ãè¿ããã
o2
æé»çã«
o1
ãªããžã§ã¯ãã§
subscribe()
ãåŒã³åºããå¶æ°ã®åŠçã®ã¿ãèš±å¯ãããªãã¶ãŒããŒå®è£
ãæž¡ããŸãã ããã§ã«ãŒãèŠçŽ ïŒ
o1
ïŒã«å°éããŸãããããã«ã¯ã
subscribe()
åŸç¶ã®åŒã³åºãã®èŠªããããŸããã ãã®æ®µéã§ã
芳枬å¯èœãªèŠçŽ ã®ãã§ãŒã³ãå®äºãããã®åŸã芳枬å¯èœãªãœãŒã¹ãèŠçŽ ã®éä¿¡
ïŒæŸå°ïŒãéå§ããŸãã
RxJavaã®ãµãã¹ã¯ãªãã·ã§ã³ã®æŠå¿µãçè§£ããå¿
èŠããããŸãã ãããŸã§ã§ã
ç£èŠå¯èœãªãªããžã§ã¯ãã®ãã§ãŒã³ãã©ã®ããã«åœ¢æãããã€ãã³ããç£èŠå¯èœãªãœãŒã¹ããã©ã®ããã«äŒæããããçè§£ããå¿
èŠããããŸãã
observeOnïŒïŒ
ãããŸã§èŠãŠããããã«ã
subscribeOn()
ã¯ãç¹å®ã®ã¹ããªãŒã ã«èŠçŽ ãéä¿¡ããããã«ç£èŠå¯èœãªãœãŒã¹ã«æç€ºãããã®ã¹ããªãŒã ã¯
SubscriberãŸã§èŠçŽ ãããã¢ãŒããã責任ããããŸãã ãããã£ãŠãããã©ã«ãã§ã¯ããµãã¹ã¯ã©ã€ããŒã¯åãã¹ããªãŒã ã§åŠçæžã¿ã¢ã€ãã ãåãåããŸãã
ããããããã¯ããªããæåŸ
ããåäœã§ã¯ãªããããããŸããã ãããã¯ãŒã¯ããããŒã¿ãååŸããŠããŠãŒã¶ãŒã€ã³ã¿ãŒãã§ã€ã¹ã«è¡šç€ºãããšããŸãã
2ã€ã®ããšãè¡ãå¿
èŠããããŸãã
- ãã³ããããã³ã°I / Oã¹ããªãŒã ã§ãããã¯ãŒã¯åŒã³åºããè¡ããŸã
- ã¡ã€ã³ã¢ããªã±ãŒã·ã§ã³ã¹ã¬ããã§çµæãååŸãã
å
¥å/åºåã¹ããªãŒã ã§ãããã¯ãŒã¯åŒã³åºããè¡ããçµæããµãã¹ã¯ã©ã€ããŒã«æž¡ã
Observable
ããããŸãã
subscribeOn(Schedulers.io())
ã®ã¿ã䜿çšããå Žåãã¿ãŒã²ãããµãã¹ã¯ã©ã€ããŒã¯åãå
¥å/åºåã¹ããªãŒã ã§çµæãåŠçããŸãã ã¡ã€ã³ã¹ã¬ããã§ã¯Androidã®ãŠãŒã¶ãŒã€ã³ã¿ãŒãã§ã€ã¹ããæäœã§ããªãããã幞éã§ã¯ãããŸããã§ããã
ä»ãããããŒãåãæ¿ããå¿
èŠãããããã®ããã«
observeOn()
ã䜿çšããŸãã
observeOn()
ãã³ãŒã«ãã§ãŒã³ã§çºçãããšãobservable sourceã«ãã£ãŠéä¿¡ãããèŠçŽ ã¯
observeOn()
æå®ãããã¹ããªãŒã ã«çŽã¡ã«è»¢éãããŸãã
getIntegersFromRemoteSource() .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
ãã®çºæãããäŸã§ã¯ããããã¯ãŒã¯ããã®æŽæ°ã®åä¿¡ãšã芳枬å¯èœãªãœãŒã¹ããã®ãããªãäŒéã芳å¯ããŸãã å®éã®äŸã§ã¯ãããã¯ä»ã®éåææäœãããšãã°ã倧ããªãã¡ã€ã«ã®èªã¿åããããŒã¿ããŒã¹ããã®ããŒã¿ã®ãã§ãããªã©ã§ãã ãã®äŸãå®è¡ããŠçµæã確èªããã³ã³ãœãŒã«ã®ãã°ããã©ãã ãã§ãã
次ã«ãããŒã¿åŠçäžã«ã¹ã¬ãããåãæ¿ããããã«
observeOn()
ãæ°ååŒã³åºããããããè€éãªäŸãèããŠã¿ãŸãããã
getIntegersFromRemoteSource() .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .map(integer -> { println("Mapping item " + integer + " on: " + currentThread().getName()); return integer * integer; }) .observeOn(Schedulers.newThread()) .filter(integer -> { println("Filtering item " + integer + " on: " + currentThread().getName()); return integer % 2 == 0; }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
äžèšã®äŸã§ã¯ã
subscribeOn()
ã
Schedulers.io()
ãšãšãã«äœ¿çšãããããç£èŠå¯èœãªãœãŒã¹ã¯èŠçŽ ãåºåå
¥åã¹ããªãŒã å
ã®ãã³ãã©ãŒã®ãã§ãŒã³ã«æž¡ããŸãã æ¬¡ã«ã
map()
æŒç®åã䜿çšããŠåèŠçŽ ã倿ããŸãããèšç®ã¹ããªãŒã ã§ãããè¡ãå¿
èŠããããŸãã ãããè¡ãã«ã¯ã
map()
ãåŒã³åºããŠã¹ããªãŒã ãåãæ¿ããèŠçŽ ãã¿ãŒã²ããã®èšç®ã¹ããªãŒã ã«è»¢éããåã«ã
observeOn()
ãš
Schedulers.computation()
ã䜿çšããŸãã
次ã®ã¹ãããã§ã¯ãããã€ãã®èŠçŽ ãé€å€ããäœããã®çç±ã§ãåèŠçŽ ã®æ°ããã¹ã¬ããã§ãã®æäœãå®è¡ããŸãã
observeOn()
å床䜿çšããŸããã
filter()
æŒç®åãåŒã³åºããŠåèŠçŽ ãæ°ããã¹ã¬ããã«æž¡ãåã«ããã§ã«
Schedulers.newThread()
ãšãã¢ã«ãªã£ãŠããŸãã
ãã®çµæããµãã¹ã¯ã©ã€ããŒããŠãŒã¶ãŒã€ã³ã¿ãŒãã§ã€ã¹ã¹ããªãŒã ã§åŠçã®çµæãåãåãããã«ããŸãã ãããè¡ãã«ã¯ã
observeOn()
ãš
AndroidSchedulers.mainThread()
ã¹ã±ãžã¥ãŒã©ãŒã䜿çšããŸãã
ãããã
observeOn()
æ°å
observeOn()
ãŠäœ¿çšãããšã©ããªããŸããïŒ æ¬¡ã®äŸã§ã¯ããµãã¹ã¯ã©ã€ããŒã¯ã©ã®ã¹ã¬ããã§çµæãåãåããŸããïŒ
getIntegersFromRemoteSource() .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .observeOn(Schedulers.single()) .observeOn(Schedulers.computation()) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
ãã®äŸãå®è¡ãããšããµãã¹ã¯ã©ã€ããŒã
RxComputationThreadPool-1
èšç®ã¹ããªãŒã ã®èŠçŽ ãåãåãããšãããããŸãã ããã¯ã
observeOn()
æåŸã®åŒã³åºãã
observeOn()
ããããšãæå³ããŸãã ãªãã ãããïŒ
ãã³ãããã®äž
ããããããªãã¯ãã§ã«æšæž¬ããŸããã ç¥ã£ãŠããããã«ããµãã¹ã¯ãªãã·ã§ã³ã¯
Obsevable
ãã¹ãŠã®ã©ãŠã³ã
Obsevable
åŸã«åŒã³åºãããŸãããã€ãã³ã
ïŒãšããã·ã§ã³ïŒã®éä¿¡ã§ã¯ããã¹ãŠãéã«çºçããŸããã€ãŸããã³ãŒããèšè¿°ãããŠããéåžžã®æ¹æ³ã§ãã åŒã³åºãã¯ã芳枬å¯èœãªãœãŒã¹ãããããã«åŒã³åºããã§ãŒã³ãäžã£ãŠãµãã¹ã¯ã©ã€ããŒã«å°éããŸãã
observeOn()
æŒç®åã¯åžžã«çŽæ¥ã®é åºã§æ©èœããããããããŒã¯é çªã«åãæ¿ããããæåŸã«èšç®ã¹ããªãŒã ã«åãæ¿ããããŸãïŒ
observeOn(Schedulers.computation())
ïŒã ãããã£ãŠãæ°ããã¹ããªãŒã ã®ããŒã¿ãåŠçããããã«ã¹ããªãŒã ãåãæ¿ããå¿
èŠãããå Žåã¯ã
observeOn()
ã«
observeOn()
åŒã³åºããŠãããèŠçŽ ãåŠçããŸãã åæãç«¶åç¶æ
ã®äŸå€ãããããã¹ãŠãããã³ãã«ãã¹ã¬ããRxJavaã®ãã®ä»ã®å€ãã®å°é£ããŠãŒã¶ãŒã«ä»£ãã£ãŠåŠçããŸãã
ãŸãšã
ããã§ãRxJavaãé©åã«äœ¿çšããŠããŠãŒã¶ãŒã€ã³ã¿ãŒãã§ã€ã¹ã®é«éã§ã¹ã ãŒãºãªæäœãæäŸãããã«ãã¹ã¬ããã¢ããªã±ãŒã·ã§ã³ãäœæããæ¹æ³ã«ã€ããŠãããªãè¯ãã¢ã€ãã¢ãåŸãããŸããã
ããã«çè§£ãåŸãããªãå Žåãããã¯å€§äžå€«ã§ãã èšäºãããäžåºŠèªã¿ãã³ãŒãäŸã詊ããŠãã ããã çè§£ããã«ã¯å€ãã®ãã¥ã¢ã³ã¹ããããŸããæéããããŠãã ããã