
æè¿ããªã¢ã¯ãã£ãããã°ã©ãã³ã°ã«ã€ããŠããè³ã«ããã¡ãã»ãŒãžé§ååã¢ãŒããã¯ãã£ãã€ãã³ããœãŒã·ã³ã°ãCQRSãªã©ã®ããŸããŸãªæµè¡èªãèŠãŠããŸãã æ®å¿µãªãããããã«ã€ããŠã¯ããªãã®éãHabréã§æžãããŠããã®ã§ãç¶æ³ãä¿®æ£ããç§ã®ç¥èãã¿ããªãšå
±æããããšã«ããŸããã
ãã®èšäºã§ã¯ããªã¢ã¯ãã£ãã¢ããªã±ãŒã·ã§ã³ã®äž»ãªæ©èœã«ã€ããŠåŠç¿ããCQRSããã³EventSourcingãã¿ãŒã³ãããããäœæããã®ã«ã©ã®ããã«åœ¹ç«ã€ããæ€èšããéå±ããªãããã«ããªã¢ã¯ãã£ãããã°ã©ãã³ã°ã®ãã¹ãŠã®èŠç¯ã«å¯Ÿå¿ããWebãœã±ãããšã¢ã¯ã¿ãŒãåããç¬èªã®ã¡ãã»ã³ãžã£ãŒã段éçã«äœæããŸãã ãã®ãã¹ãŠãå®è£
ããããã«ãã¢ã¯ã¿ãŒã¢ãã«ãå®è£
ããåçã®åªããAkkaã©ã€ãã©ãªãšãšãã«ãã°ãããScalaèšèªã䜿çšããŸãã ãŸããPlay Frameworkã䜿çšããŠãã¢ããªã±ãŒã·ã§ã³ã®Webã³ã³ããŒãã³ããèšè¿°ããŸãã ããã§ã¯å§ããŸãããã
ãã®èšäºã¯ããã§ã«Scalaã«ç²ŸéããŠããã俳åªã®ã¢ãã«ã«ã€ããŠèããããšããã人ã察象ãšããŠããŸãã ä»ã®ãã¹ãŠãèªãããšãå§ããããŸãããªã¢ã¯ãã£ãããã°ã©ãã³ã°ã®ååã¯ãèšèªããã¬ãŒã ã¯ãŒã¯ã«é¢ä¿ãªãé©çšã§ããŸãã
ãªã¢ã¯ãã£ãããã°ã©ãã³ã°ãšã¯äœã§ããïŒ
ãªã¢ã¯ãã£ãããã°ã©ãã³ã°ã®èãæ¹ã¯ããªã¢ã¯ãã£ããããã§ã¹ã
www.reactivemanifesto.orgã§èª¬æãããŠããŸãã
æåã®ããŒãžã§ã³ã®ç¿»èš³
ã¯ãã§ã«Habréã§è¡ãããŠããã2çªç®ã®ããŒãžã§ã³ã¯æåã®ããŒãžã§ã³ãšå°ãç°ãªããŸãã 2çªç®ã®ããŒãžã§ã³ããã®ç°¡åãªã¯ãªããã³ã°ãèŠãŠã¿ãŸãããã ãªã¢ã¯ãã£ããããã§ã¹ãã¯ããªã¢ã¯ãã£ãã¢ããªã±ãŒã·ã§ã³ã«ã¯ããã€ãã®éèŠãªããããã£ãããããšã瀺ããŠããŸãã
å¿çæ§
ã¢ããªã±ãŒã·ã§ã³ã¯ã§ããã ãæ©ãå¿çããŸãã å¿çæ§ã¯ã䜿ãããããšæçšæ§ã®åºç€ã§ããåçŽãªçç±ã¯ãã€ã³ã¿ãŒãã§ãŒã¹ã®é
延ãé·ããŠãããã䜿çšããããšãã欲æ±ãè¿œå ãããªããšããããšã§ãã ã¬ã¹ãã³ã·ãã·ã¹ãã ã¯ãé©åãªäžéæéã䜿çšããŠäžè²«ãããµãŒãã¹å質ã確ä¿ããããã«ãè¿
éã§å®è©ã®ããå¿çãæäŸããããšã«éç¹ã眮ããŠããŸãã ãã®äžè²«ããäºæž¬å¯èœãªåäœã«ããããšã©ãŒåŠçãç°¡çŽ åããããŠãŒã¶ãŒã®ä¿¡é Œã匷åãããããã«çžäºäœçšããããã«ãªããŸãã
èé害æ§
é害ãçºçããŠããã¢ããªã±ãŒã·ã§ã³ã¯å¿çãç¶ããŸãã ããã¯ãå¯çšæ§ã®é«ãããã·ã§ã³ã¯ãªãã£ã«ã«ãªã·ã¹ãã ã ãã§ãªããé害ãçºçããå Žåã§ãé害ã«å¯Ÿå¿ããã·ã¹ãã ã¯å¿çããŸããã æç¶å¯èœæ§ã¯ãè€è£œãããŒã«ã©ã€ãºãåé¢ãããã³å§ä»»ã«ãã£ãŠå®çŸãããŸãã é害ã¯ã¢ãžã¥ãŒã«ãè¶
ããªããããã¢ãžã¥ãŒã«ãçžäºã«åé¢ããããšã§ãã¢ããªã±ãŒã·ã§ã³å
šäœãã¯ã©ãã·ã¥ãããããšãªããã¢ããªã±ãŒã·ã§ã³ã®ç¹å®ã®éšåã倱æããé害åŸã«å埩ã§ããããšã確èªã§ããŸãã é害ãçºçããåã¢ãžã¥ãŒã«ã®å埩ã¯å¥ã®å€éšã¢ãžã¥ãŒã«ã«å§ä»»ãããã¬ããªã±ãŒã·ã§ã³ã«ãã£ãŠé«å¯çšæ§ãå®çŸãããŸãã ã¢ãžã¥ãŒã«ã¯ã©ã€ã¢ã³ãã«ã¯ãã¢ãžã¥ãŒã«ã®é害åŠçã«é¢ããé çã®çš®ã¯ãããŸããã
匟åæ§
è² è·ãå€åããŠããã¢ããªã±ãŒã·ã§ã³ã¯å¿çãç¶ããŸãã ãªã¢ã¯ãã£ãã¢ããªã±ãŒã·ã§ã³ã¯ãåŠçã«äœ¿çšã§ãããªãœãŒã¹ãå¢æžããããšã«ãããè² è·ã®å€åã«å¯Ÿå¿ã§ããŸãã ããã¯ãããã¯ãã€ã³ããäžå¿çãªããã«ããã¯ã®ãªãã¢ãŒããã¯ãã£ãæå³ããŸããããã¯ãã¢ãžã¥ãŒã«ã®ã·ã£ãŒãã£ã³ã°ãšã¬ããªã±ãŒããããã³ã¢ãžã¥ãŒã«éã®è² è·ã®ãããªãåæ£ã§è¡šçŸãããŸãã ãã®çµæãå®äŸ¡ã§äžè¬çã«äœ¿çšãããéã®æ¡åŒµæ§ãåŸãããŸãïŒããã«ã¡ã¯googleïŒïŒã
ã¡ãã»ãŒãžã®æ¹å
ãªã¢ã¯ãã£ãã¢ããªã±ãŒã·ã§ã³ã¯éåæã¡ãã»ãŒãžã³ã°ã«çŠç¹ãåãããŠãã¢ãžã¥ãŒã«éã®å¢çã確ç«ããŸããããã«ãããå Žæã®æ¥ç¶æ§ãåé¢æ§ãééæ§ãäœäžãããšã©ãŒãã¡ãã»ãŒãžãšããŠå§ä»»ããæ段ãæäŸãããŸãã æ瀺çãªã¡ãã»ãŒãžã³ã°ã®å°å
¥ã«ãããã¡ãã»ãŒãžãã¥ãŒã®åœ¢æãšç£èŠãéããŠããŒã¿ãããŒã®è² è·åæ£ã匟åæ§ãããã³å¶åŸ¡ãå¯èœã«ãªããå¿
èŠã«å¿ããŠåž¯åå¹
ãåæžãããŸãã å Žæã®ééæ§ã«ãããã¯ã©ã¹ã¿ãŒãšåäžããŒãã®äž¡æ¹ã§åããšã©ãŒåŠçããžãã¯ã䜿çšã§ããŸãã ãã³ããããã³ã°éä¿¡ã«ãããã¡ãã»ãŒãžåä¿¡è
ã¯ãªãœãŒã¹ãã¢ã¯ãã£ããªå Žåã«ã®ã¿ãªãœãŒã¹ãæ¶è²»ã§ããŸããããã«ãããã¢ããªã±ãŒã·ã§ã³ã®å®è¡æã®ãªãŒããŒããããæžå°ããŸãã
Cqrs
CQRSã¯Command Query Responsibility Segregationã®ç¥ã§ãã åºã䜿çšãããŠããCRUDïŒäœæãååŸãæŽæ°ãåé€ïŒãšã¯ç°ãªããã¢ããªã±ãŒã·ã§ã³ã®ã¢ãŒããã¯ãã£ãæ§ç¯ãããã®ã¢ãããŒãã¯ãæ
å ±ã®æŽæ°ãšèªã¿åãã«ç°ãªãã¢ãã«ã䜿çšã§ããããšãæå³ããŸãã è«ççãªçåãçããŸãããªããã®ãããªåé¯ãå¿
èŠãªã®ã§ããããïŒ å®éã«ã¯ãèªã¿åãã¢ãã«ãšèšé²ã¢ãã«ãå¥ã
ã§ãããšããäºå®ã«åºã¥ããŠããããã®ã¿ã¹ã¯ã«åãããŠæé©åããããšãã§ããŸãã ããšãã°ãããŒã¿ã®éæ£èŠåãèªã¿åãã¿ã¹ã¯ã«é©ããŠããå Žåã誰ããããæ°ã«ããŸããã ã°ã©ãããŒã¿ããŒã¹ã®ããŒã¿ãæè¿ãããŠããå Žåã¯ãèªãæ¹ã䟿å©ã§ãã ç¥ã®ããã«ããã¹ãŠãKey-Valueã¹ãã¬ãŒãžã«ä¿åããããšæããŸãã ããã«ãèªã¿åãã¢ãã«ã«æ°ããæ©èœãè¿œå ããå Žåãè¿œå åŸã«è¡ãå¿
èŠãããã®ã¯ãã¢ãã«ãåçæããããšã ãã§ãïŒå€ãã®ã®ã¬ãã€ãã®ã€ãã³ããããå Žåããã®ããã»ã¹ã¯ããã»ã©éããªãããšãäºçŽããå¿
èŠããããŸãããã¹ãããã·ã§ãããäœæããŸããããã«ããããªã«ããªã®é床ã倧å¹
ã«åäžããŸãïŒã
ååãšããŠãåãçç±ã§ãèªã¿åãã¢ãã«ã®æ£èŠåã¯ãŸã£ããæ°ã«ããããšã¯ã§ããŸããã CQRSã䜿çšããŠãã¢ããªã±ãŒã·ã§ã³ã®èªã¿åãæäœãæé©åããã¢ããªã±ãŒã·ã§ã³ã®å¿çæ§ã確ä¿ããŸãã ã¢ããªã±ãŒã·ã§ã³ãçã«ã¬ã¹ãã³ã·ãã«ããããã«ä»ã«äœãæ®ã£ãŠããŸããïŒ ããã§ãã匟åæ§ãšèé害æ§ã Event Sourcingãã¿ãŒã³ã䜿çšããŠãããã®æ©èœãå®è£
ããŸãã
ã€ãã³ããœãŒã·ã³ã°
ESã®ãã€ã³ãã¯ãããŒã¿ã¢ãã«ã®çŸåšã®ç¶æ
ã§ã¯ãªããã¢ããªã±ãŒã·ã§ã³ã®ç¶æ
ãå€æŽããå€æŽã®å±¥æŽå
šäœïŒå®éã«ã¯ããã¹ãŠã®å€æŽã§ã¯ãªããéèŠãªå€æŽã®ã¿ïŒãä¿åããããšã§ãã çŸåšã®ç¶æ
ãååŸããã«ã¯ãæ¢åã®ãã¹ãŠã®ã€ãã³ãããã®å€æŽãèŠçŽããã ãã§ãã ã€ãã³ããšã¯ã©ãããæå³ã§ããïŒãŸããã€ãã³ãã¯ããŒã ãšã©ãéãã®ã§ããïŒ ããŒã ã¯ã誰ããç§ãã¡ã«æãã§ããããšãæå³ããŸãããç¡èŠããããšãã§ããŸãã ã€ãã³ãã¯çºçãããã®ã§ãããå€æŽäžå¯èœãªäºå®ã§ãã
ãã®ã¢ãããŒãã®å©ç¹ã¯ãäœãåé€ãŸãã¯å€æŽããªãããšã§ãã ãæ³åã®ãšãããããã«ããã¢ããªã±ãŒã·ã§ã³ãæ¡åŒµãã絶奜ã®æ©äŒãåŸãããããŒã¿ããŒã¹ãšããŠã¯ãCassandraãHBaseãªã©ã®å®è©ã®ããNoSQLãœãªã¥ãŒã·ã§ã³ã䜿çšã§ããŸãã EventSourcingã¯ãèé害æ§ãšåŒŸåæ§ãæäŸããŸãã
話ããããŠãã³ãŒããèŠããŠ
ãããã£ãŠãåè¿°ããããã«ã
Typesafeã¹ã¿ãã¯ã䜿çšããŠãã®ãã¹ãŠãå®è£
ã
ãŸã ã
ã¢ããªã±ãŒã·ã§ã³ã®ã¢ãŒããã¯ãã£ã¯æ¬¡ã®ããã«ãªããŸãã
ãŠãŒã¶ãŒã¯ã¡ãã»ãŒãžãèªãã ãéä¿¡ãããã§ããŸãã ã¡ãã»ãŒãžã¯ãUserConnectionã¢ã¯ã¿ãŒãã¢ã¯ã»ã¹ã§ããWebãœã±ãããä»ããŠéåä¿¡ãããŸãã ãã®ã¢ã¯ã¿ãŒã¯ãRââoomWriterã¢ã¯ã¿ãŒã«ã¡ãã»ãŒãžãéä¿¡ããŸããRoomWriterã¢ã¯ã¿ãŒã¯ããžã£ãŒãã«ãžã®ã¡ãã»ãŒãžã®æžã蟌ã¿ã«å ããŠãRoomReaderã¢ã¯ã¿ãŒãããã¯ããŸããRoomReaderã¢ã¯ã¿ãŒã¯ããžã£ãŒãã«ããã¡ãã»ãŒãžãèªã¿åããUserConnectionã¢ã¯ã¿ãŒã«éãè¿ããŸãã ããããã¹ãŠã«å ããŠãååã®çºè¡ãåŠçããã¢ããªã±ãŒã·ã§ã³ã«2ã€ã®åãååãæã€ãŠãŒã¶ãŒãããªãããšãä¿èšŒããåä»ã¢ã¯ã¿ãŒããããŸãã ç§ãã¡ã¯å€ããå°ãªããã¢ãŒããã¯ãã£ãç解ããŠããã®ã§ãã³ãŒããæžãå§ããŸãã
ã«ãŒã ã©ã€ã¿ãŒ
æåã«å®è£
ããã®ã¯ãçä¿¡ã¡ãã»ãŒãžããžã£ãŒãã«ã«æžã蟌ãã¢ã¯ã¿ãŒã§ãã
RoomWriterã¯ã©ã¹ã³ãŒãclass RoomWriter(roomLogId: String) extends PersistentActor { import RoomWriter._ override def persistenceId = roomLogId val listeners = mutable.Set.empty[ActorRef] def receiveRecover = Actor.emptyBehavior def receiveCommand = { case msg: Message => persistAsync(msg) { _ => listeners foreach (_ ! Update) } case Listen(ref) => listeners add context.watch(ref) case Terminated(ref) => listeners remove ref } }
ããã«äœãæžãããŠããŸããïŒ ãæ³åã®ãšããã次ã®3ã€ã®éšåãæã€RoomWriterã¯ã©ã¹ã宣èšããŸããã
- èå¥åpersistenceIdããã®ã¢ã¯ã¿ãŒã«ãã£ãŠçæãããã€ãã³ããäžæã«èå¥ããããã«å¿
èŠã§ãã
- ãã°å
ã§äœããå€æŽããããšããéç¥ãåä¿¡ããã¢ã¯ã¿ãŒãžã®ãªã³ã¯ã®ã»ãããå«ããªã¹ããŒã®ã»ããã
- 2ã€ã®ã¡ãœãããreceiveRecoverã¯ãã¢ã¯ã¿ãŒã®äœææã«çºçãããã°ããã¡ãã»ãŒãžãåçãããšãã«åŒã³åºãããreceiveCommandã¯ãéåžžã®æäœäžã«ã¡ãã»ãŒãžãåŠçããããã«äœ¿çšãããŸãã
receiveCommandã¡ãœãããããå°ã詳ããæ€èšããŠãã ããã ãã®ã¡ãœããã¯ã3ã€ã®ç°ãªãã¡ãã»ãŒãžãåŠçããŸãã
- ã¿ã€ããMessageã®ã¡ãã»ãŒãžãåä¿¡ãããšããã°ã«éåæã§æžã蟌ãŸããåãªã¹ããŒã«ã¯ãã°ãæŽæ°ãããããšã瀺ãã¡ãã»ãŒãžãéä¿¡ãããŸãã
- Listenãåä¿¡ãããšãã¢ã¯ã¿ãŒã®ã©ã€ããµã€ã¯ã«ã®ç£èŠãéå§ããŸããã¡ãã»ãŒãžå
ã®ãªã³ã¯ããšãããã¢ã¯ã¿ãŒãžã®ãªã³ã¯ã¯å€ãã®ãªã¹ããŒã«è¿œå ãããŸãã
- ã©ã€ããµã€ã¯ã«ãç£èŠããŠããã¢ã¯ã¿ãŒãçªç¶æ»äº¡ããå Žåã«åä¿¡ãããæ»äº¡ããã¢ã¯ã¿ãŒãžã®ãªã³ã¯ãå«ãçµäºã¡ãã»ãŒãžã ãããçºçããå ŽåïŒãŠãŒã¶ãŒããã©ãŠã¶ãéããå ŽåïŒãã¡ãŒãªã³ã°ãªã¹ããããã®ã¢ã¯ã¿ãŒãåé€ããŸãã
é©åãªã«ãŒã«ã¯ãåŠçããããã¹ãŠã®ã¡ãã»ãŒãžã®å®£èšãšãã³ã³ãããªã³ãªããžã§ã¯ãã§ã¢ã¯ã¿ãŒãäœæãããã¡ã¯ããªã¡ãœããã§ãã
RoomWriterã³ã³ãããªã³ã³ãŒã object RoomWriter { case class Listen(ref: ActorRef) case class Message(author: String, content: String, time: Long) case object Update def props(roomId: String) = Props(new RoomWriter(roomId)) }
RoomWriterãèŠã€ããŸãããä»åºŠã¯RoomReaderã¢ã¯ã¿ãŒãèŠãŠã¿ãŸããããRoomReaderã¢ã¯ã¿ãŒã¯ããã¬ãžã³ããæŽæ°ãåä¿¡ããäžã®éå±€ã«éä¿¡ããŸãã
RoomReader
RoomReaderã¯ã©ã¹ class RoomReader(roomLogId: String, roomWriter: ActorRef, userConnection: ActorRef) extends PersistentView { import RoomWriter._ roomWriter ! Listen(self) override def persistenceId = roomLogId override def viewId = roomLogId + "-view" def receive = { case msg @ Message(_, _,sendingTime) if currentTime - sendingTime < tenMinutes => userConnection ! msg case msg: Message => case Update => self ! akka.persistence.Update() } }
RoomReaderã¯ãæŽæ°ãåä¿¡ãããã°èå¥åã«äŸåããŸãã ãã®å Žåããã®èå¥åã¯RoomWriterã¢ã¯ã¿ãŒã®èå¥åãšäžèŽããŸããã€ãŸããRoomWriterããã°ã«æžã蟌ããã¹ãŠãRoomReaderã«éãããŸãã ã¡ãã»ãŒãžåŠçã®çºçæ¹æ³ãæ€èšããŠãã ããã
- ã¡ãã»ãŒãžãåä¿¡ããããšãéä¿¡ãããæéããã§ãã¯ãããã¡ãã»ãŒãžã10å以äžçµéããŠããå ŽåããŠãŒã¶ãŒã«ã¯è¡šç€ºãããŸããã ããã¯ã以åã«èç©ãããäœåãã®ã¡ãã»ãŒãžããŠãŒã¶ãŒã«å±ããªãããã«ããããã§ãã
- Updateãåä¿¡ãããšãã¢ã¯ã¿ãŒã¯ãã°ãèªã¿åããèªã¿åãã¡ãã»ãŒãžããŠãŒã¶ãŒã«éä¿¡ããŸãã
åã®ã±ãŒã¹ãšåæ§ã«ãã³ã³ãããªã³ãªããžã§ã¯ãïŒ
RoomReaderã¯ã©ã¹ã®ã³ã³ãããªã³ã³ãŒã object RoomReader { def currentTime = System.currentTimeMillis() val tenMinutes = Duration(10, MINUTES).toMillis def props(roomLogId: String, roomWriter: ActorRef, userConnection: ActorRef) = Props( new RoomReader(roomLogId, roomWriter, userConnection) ) }
Webãœã±ããããã®ã¡ãã»ãŒãžã®åŠçãæ
åœããæãèå³æ·±ãã¢ã¯ã¿ãŒUserConnectionã«æ³šç®ããŸãã
ãŠãŒã¶ãŒæ¥ç¶
UserConnectionã¯ã©ã¹ã³ãŒã class UserConnection(receptionist: ActorRef, roomWriter: ActorRef, out: ActorRef, roomLogId: String) extends Actor { import actors.UserConnection._ def receive = waitingForUsername def waitingForUsername: Receive = { case WebSocketInMsg(RegisterMeWithName, username) => receptionist ! UsernameRequest(username) case Ack(username) => context become readyToChat(username) context actorOf RoomReader.props(roomLogId, roomWriter, self) out ! WebSocketOutMsg(currentTime, "system", "welcome") case NAck => out ! WebSocketOutMsg(currentTime, "system", "taken") } def readyToChat(username: String): Receive = { case WebSocketInMsg(SendMessage, message) => roomWriter ! Message(username, message, currentMillis) case Message(author, content, time) => out ! WebSocketOutMsg(formatTime(time), author, content) } }
ãã®ä¿³åªã«ã¯ã圌ãä»ã®äººãšåºå¥ãã1ã€ã®ç¹åŸŽããããŸãã圌ã¯èªåã®è¡åãšç¶æ
ãå€ããããšãã§ããŸãã æåã¯ããŠãŒã¶ãŒåã®åä¿¡ãåŸ
æ©ããŠããŸãã ãã®ç¶æ
ã§ã圌ã¯ååã®ã¯ã©ã€ã¢ã³ãèŠæ±ãåãå
¥ããååã®çºè¡ãæ
åœããã¢ã¯ã¿ãŒã«ãããã転éã§ããŸãã ååã®åä¿¡ã«æåãããšãã¢ã¯ã¿ãŒã¯ãã£ããã®æºåç¶æ
ã«å
¥ããã·ã¹ãã ã®åéšã®éã§ã¡ãã»ãŒãžã®è»¢éãéå§ããŸãã
ä»åã®ã³ã³ãããªã³ãªããžã§ã¯ãã¯éåžžã«å€§ããããšãå€æããŸããã
UserConnectionã¯ã©ã¹ã®ã³ã³ãããªã³ãªããžã§ã¯ãã³ãŒã object UserConnection { def props(receptionist: ActorRef, roomWriter: ActorRef, out: ActorRef, roomLogId: String) = Props( new UserConnection(receptionist, roomWriter, out, roomLogId) ) case class WebSocketInMsg(messageType: Int, messageText: String) case class WebSocketOutMsg(time: String, from: String, messageText: String) case class UsernameRequest(name: String) case class Ack(username: String) case object NAck val RegisterMeWithName = 0 val SendMessage = 1 val formatter = DateTimeFormat.forPattern("HH:mm:ss").withLocale(Locale.US) def currentTime = DateTime.now().toString(formatter) def currentMillis = System.currentTimeMillis() def formatTime(timeStamp: Long) = new DateTime(timeStamp).toString(formatter) }
æåŸã«å°æ¬ããã俳åªã¯åä»ã§ãã
åä»ä¿
ã¯ã©ã¹ã³ãŒãåä» class Receptionist extends Actor { var takenNames = mutable.Map("system" -> self) def receive = { case UsernameRequest(username) => if (takenNames contains username) { sender() ! NAck } else { takenNames += (username -> context.watch(sender())) sender() ! Ack(username) } case Terminated(ref) => takenNames collectFirst { case (name, actor) if actor == ref => name } foreach takenNames.remove } }
ãã®ã¿ã¹ã¯ã«ã¯ããŠãŒã¶ãŒãžã®ååã®çºè¡ãå«ãŸããŸããååã«actorRefããããã³ã°ããé£æ³é
åãå«ãŸããŸãã RoomWriterã®å Žåãšåæ§ã«ãååãä»ãã俳åªã®ã©ã€ããµã€ã¯ã«ã«åŸããæ»äº¡ããå Žåã¯ç»é²åã®ãªã¹ãããååãåé€ããŸãã
ã³ã³ãããªã³ãªããžã§ã¯ããå¿ããªãã§ãã ãããã¢ã¯ã¿ãäœæããããã®ãã¡ã¯ããªã¡ãœãããåãåºããŸãã
åä»ã³ã³ãããªã³ãªããžã§ã¯ãã³ãŒã object Receptionist { def props() = Props[Receptionist] }
ã³ã³ãããŒã©ãŒ
çŸæç¹ã§ã¯ãå®è£
èšç»ãæã£ãŠãããã¹ãŠã®ã¢ã¯ã¿ãŒãå®äºããŸããã 次ã«ãWebãœã±ãããšã¢ã¯ã¿ãŒãæ¥ç¶ããæ¹æ³ãèŠãŠã¿ãŸãããã ãããè¡ãã«ã¯ãplayãã¬ãŒã ã¯ãŒã¯ãæäŸããããŒã«ã䜿çšããŸãã ã¢ããªã±ãŒã·ã§ã³ã®ã³ã³ãããŒã©ãŒã次ã®ããã«å®è£
ããŸãã
ã³ã³ãããŒã©ãŒã³ãŒã object Application extends Controller { val logId = "akka-is-awesome" val roomWriter = Akka.system.actorOf(RoomWriter.props(logId), "writer") val receptionist = Akka.system.actorOf(Receptionist.props(), "receptionist") def index = Action { implicit request => Ok(views.html.chat()) } implicit val InMsgFormat = Json.format[WebSocketInMsg] implicit val InMsgFrameFormatter = FrameFormatter.jsonFrame[WebSocketInMsg] implicit val OutMsgFormat = Json.format[WebSocketOutMsg] implicit val OutMsgFrameFormatter = FrameFormatter.jsonFrame[WebSocketOutMsg] def socket = WebSocket.acceptWithActor[WebSocketInMsg, WebSocketOutMsg] { request => out => UserConnection.props(receptionist, roomWriter, out, logId) } }
æåã«ãroomWriterãšåä»ä¿ã®2ã€ã®ã¢ã¯ã¿ãŒãäœæããŸãã ãããã¯ãUserConnectionã¢ã¯ã¿ãŒã®äŸåé¢ä¿ã§ãã 次ã«ãWeb mosketãä»ããŠã¡ãã»ãŒãžã転éããããã®ã¡ãã»ãŒãžã®ãã©ãŒãããæ¹æ³ã«ã€ããŠèª¬æããŸãã æåŸã«ãWebãœã±ãããžã®çä¿¡æ¥ç¶ã®åŠçæ¹æ³ã«ã€ããŠèª¬æããŸãã Play Frameworkã«çµã¿èŸŒãŸãããã«ããŒã«ãããéåžžã«ç°¡åã«å®è¡ã§ããŸãã
Webã€ã³ã¿ãŒãã§ãŒã¹ãäœæããŸãã ã¬ã€ã¢ãŠãã«ã¯ãtwitterããŒãã¹ãã©ãããã¬ãŒã ã¯ãŒã¯ãšãangular.jsã䜿çšããŠãã¯ã©ã€ã¢ã³ãã«ããžãã¹ããžãã¯ãå®è£
ããŸãã
ã³ãŒãã®ã¯ã©ã€ã¢ã³ãåŽ angular.module('chatApp', []) .controller('ChatCtrl', ['$scope', function($scope) { var wsUri = "ws://"+window.location.host+"/ws"; var websocket = new WebSocket(wsUri); $scope.name = ""; $scope.messages = []; $scope.registered = false; $scope.taken = false; $scope.sendMessage = function () { websocket.send(angular.toJson({ "messageType": 1, "messageText":$scope.messageText })); $scope.messageText = ""; }; $scope.sendName = function () { if (!$scope.registered) { websocket.send(angular.toJson({ "messageType": 0, "messageText": $scope.name })); } }; websocket.onmessage = function (e) { var msg = angular.fromJson(e.data); console.log(e.data); if (!$scope.registered) { switch (msg.from) { case "system": handleSystemMsg(msg.messageText); break; } } else { $scope.messages.push(msg); $scope.$apply(); var chatWindow = $("#chat-window"); chatWindow.scrollTop(chatWindow[0].scrollHeight); } }; function handleSystemMsg(msg) { switch (msg) { case "welcome": $scope.registered = true; break; case "taken": $scope.taken = true; break; } } }]);
htmlããŒãžã¯æ¬¡ã®ããã«ãªããŸãã
ã¢ããªã±ãŒã·ã§ã³html <!DOCTYPE html> <html ng-app="chatApp"> <head> <meta charset="utf-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1"> <meta name="description" content=""> <meta name="author" content=""> <title>Akka WebSocket Chat</title> <link href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.1/css/bootstrap.min.css" rel="stylesheet"> <script src="https://ajax.googleapis.com/ajax/libs/angularjs/1.3.5/angular.min.js"></script> <link href="@routes.Assets.at("stylesheets/main.css")" rel="stylesheet"> <script src="@routes.Assets.at("javascripts/chatApp.js")"></script> </head> <body> <div ng-controller="ChatCtrl"> <nav class="navbar navbar-inverse navbar-fixed-top" role="navigation"> <div class="container"> <div class="navbar-header"> <a class="navbar-brand" href="#">Reactive Messenger</a> </div> <form class="navbar-form navbar-left" ng-submit="sendName()" ng-show="!registered"> <div class="form-group"> <input type="text" class="form-control" ng-model="name" placeholder="Username" required> </div> <button type="submit" class="btn btn-default">Set name</button> </form> </div> </nav> <div class="container" > <div class="chat col-lg-6"> <div id="chat-window"> <ul class="list-group"> <li class="list-group-item" ng-repeat="message in messages"> <span class="label label-info">{{message.time}}</span> <span class="label label-default">{{message.from}}</span> {{message.messageText}} </li> </ul> </div> <form ng-submit="sendMessage()"> <div> <div class="input-group"> <input type="text" ng-model="messageText" class="form-control" required> <span class="input-group-btn"> <button class="btn btn-default" type="submit"> Send<span class="glyphicon glyphicon-send" aria-hidden="true"></span> </button> </span> </div> </div> </form> </div> </div> </div> <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.1/jquery.min.js"></script> <script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.1/js/bootstrap.min.js"></script> </body> </html>
ã¹ã±ãŒã«ã¢ãŠã
ãããã¿ã€ãã¢ããªã±ãŒã·ã§ã³ããããŸãããå®çšŒåç°å¢ã«å±éããåã«ãå°ããã³ãã䜿çšããå¿
èŠããããŸãã 次ã®ããã«ãã³ãããŸãã
- ersatzãã¬ãžã³ãæ¬åœã«è¯ããã®ã«çœ®ãæããŠãã ããã ãã®å ŽåãCassandraãååŸããããã䜿çšããŠã€ãã³ããä¿åããŸãã
- ããã©ã«ãã®Javaã·ãªã¢ã«åã¯ãã¡ãã»ãŒãžã®åœ¢åŒãå€æŽãããšãã®å®å®æ§ãšãã¡ãã»ãŒãžãã·ãªã¢ã«åãããšãã®é床ã®äž¡æ¹ã«éãã¯ãããŸããã Google ProtobufãŸãã¯Kryoã«çœ®ãæãã䟡å€ããããŸãã ãã®å ŽåãProtobufã䜿çšããŸãã
- ã¡ãã»ã³ãžã£ãŒãŠãŒã¶ãŒã¯ãææ°ã®ãã¥ãŒã¹ãææ°ã«ä¿ã¡ã30å以äžåã®ã¡ãã»ãŒãžãèªã¿ãããªãã®ã§ãã ãããè¡ãã«ã¯ãã¢ã¯ã¿ãŒã®ããžãã¯ãå€æŽãã30åããšã«ã¹ãããã·ã§ãããäœæããããããŠãŒã¶ãŒãæ¥ç¶ãããã³ã«ã¡ãã»ãŒãžå±¥æŽå
šäœã埩å
ããå¿
èŠã¯ãããŸããã
- ã¢ããªã±ãŒã·ã§ã³ãå€æ°ã®ãŠãŒã¶ãŒãåŠçããããã«ã¯ãåæ£ããã䟡å€ããããŸãã
è€æ°ã®ãµãŒããŒã§ã¢ããªã±ãŒã·ã§ã³ãå®è¡ãããšããã®ã¢ãŒããã¯ãã£ã¯ãããã«å€ãããŸãã Akkaã®ã¢ã¯ã¿ãŒã«ã¯ãã±ãŒã·ã§ã³ã®ééæ§ã®ç¹æ§ããããããã¢ããªã±ãŒã·ã§ã³ãè€æ°ã®ãµãŒããŒã«ç°¡åã«ããã·ã¥ã§ããŸãã ããã«ãã¢ã¯ã¿ãŒã¯ã圌ããåå²ããããããã¯ãŒã¯ãä»ããŠéä¿¡ããç°ãªããµãŒããŒã§åäœããããšããæšæž¬ããŸããã å¿
èŠãªã®ã¯ãã³ãŒããè¿œå ããã ãã§ããAkkaãæ®ãã®äœæ¥ãè¡ããŸãã
å
ã«é²ã¿ããã¹ãŠã®æ¹ååŸã®ã¢ããªã±ãŒã·ã§ã³ã®å€èŠ³ã説æããŸãã äžè¬ã«ãã¢ãŒããã¯ãã£ã¯å°ããªå€æŽãåããŸãããèãæ¹ã¯åããŸãŸã§ãã
cassandraãéèªãšããŠäœ¿çšããã«ã¯ã次ã®ãã®ãå¿
èŠã§ãã
- ããŒãã«cassandraãã€ã³ã¹ããŒã«ãã
- ãã©ã°ã€ã³ã䜿çšããŠãcassandraã§ãã°ãä¿æããŸãã
æåã®æ®µèœã¯å
¬åŒããã¥ã¢ã«ã§è©³ãã説æãããŠããã®ã§ãããã«æã£ãŠããæå³ã¯ã»ãšãã©ãããŸããã 3ã€ã®ãã·ã³ã®ã¯ã©ã¹ã¿ãŒã§ã¯1ã€ã®sidã§ååã§ããããããã¹ãŠã®CassandyããŒããã·ãŒãããŒãã«ããå¿
èŠããªãããšã«æ³šæããŠãã ããã
2çªç®ã«ã€ããŠã¯ãæ§æã§ãã°ã®ã¿ã€ããæå®ããcassandraããŒãã®ã¢ãã¬ã¹ãç»é²ããå¿
èŠããããŸãã ããã¯æ¬¡ã®ããã«å®è¡ã§ããŸãã
Akka-persistenceèšå® akka.persistence.journal.plugin = "cassandra-journal" cassandra-journal.contact-points = ["ip1,ip2,ip3"] akka.persistence.snapshot-store.plugin = "cassandra-snapshot-store" cassandra-snapshot-store.contact-points = ["ip1,ip2,ip3"]
cassandraãæ¥ç¶ããåŸãã¡ãã»ãŒãžãã·ãªã¢ã«åããã³éã·ãªã¢ã«åããããã®ç¬èªã®ã¯ã©ã¹ãäœæããŸããæåã«protobuffã³ãŒããžã§ãã¬ãŒã¿ãŒã䜿çšããŠå¿
èŠãªã¯ã©ã¹ãçæãããã®å©ããåããŠã·ãªã¢ã©ã€ã¶ãŒãäœæããŸãã
protobuffãã¡ã€ã«ã¯æ¬¡ã®ããã«ãªããŸãã
protobufãã¡ã€ã«ã®å
容 option java_package = "actors.messages"; option optimize_for = SPEED; message ChatMessage { optional string author = 1; optional string content = 2; optional int64 timestamp = 3; }
protobuffã§å¿
èŠãªã¯ã©ã¹ãçæããåŸãã·ãªã¢ã©ã€ã¶ãŒãäœæããŸãã
ã¡ãã»ãŒãžã·ãªã¢ã©ã€ã¶ãŒã³ãŒã class ChatMessageSerializer extends Serializer { def identifier: Int = 193823 def includeManifest: Boolean = false def toBinary(obj: AnyRef): Array[Byte] = obj match { case ChatMessage(author, content, timestamp) => ProtoChatMessage.newBuilder() .setAuthor(author) .setContent(content) .setTimestamp(timestamp) .build() .toByteArray case _ => throw new IllegalArgumentException("unknown type " + obj.getClass) } def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = { val proto = ProtoChatMessage.parseFrom(bytes) ChatMessage(proto.getAuthor, proto.getContent, proto.getTimestamp) } }
ããã§ãéåžžã®ãã°ãšããã«æžã蟌ãæ¹æ³ãã§ããŸããã 次ã«ã10å以å
ã®ã¡ãã»ãŒãžãä¿åããæ¹æ³ãèãåºãå¿
èŠããããŸãã ãããè¡ãã«ã¯ãæåŸã®10åéã¡ãã»ãŒãžãä¿åããç¬èªã®ãããã¡ãŒãäœæããŸãã
ãããã¡ã³ãŒã class FixedTimeMessageBuffer(duration: Long) extends Traversable[ChatMessage] { val list = ListBuffer[ChatMessage]() def now = System.currentTimeMillis() def old = now - duration def append(elem: ChatMessage) = { if (elem.timestamp > old) { while (list.nonEmpty && list.head.timestamp < old) { list.remove(0) } list.append(elem) } } override def toList = list.toList def replace(newList: List[ChatMessage]) = { list.clear() list ++= newList } def foreach[U](f: ChatMessage => U) = list.foreach(f) }
ã¡ãã»ãŒãžãä¿åããããã®ããŒã¿æ§é ãšããŠListBufferãéžæããŸãããããã¯ãã¢ã€ãã ãæåŸã«ã®ã¿è¿œå ããæåããåé€ããããã§ãã ListBufferã䜿çšãããšããããã®æäœãäžå®ã®æéã§å®è¡ã§ããŸãã å°æ¥çã«ã¯ãæ°ããæ¥ç¶ãããã¯ã©ã€ã¢ã³ãã«éä¿¡ãããã¡ãã»ãŒãžã®æ°ãå¶éããããã«ããªãŒããŒã¢ã¯ã¿ãŒã§ãã®ãããã¡ãŒã䜿çšããŸãã
ãããã¯ãŒã¯äžã§ã¢ã¯ã¿ãŒãåå²ããæ¹æ³ãæ€èšããŠãã ããã 1ã€ã®ããŒãããªãã«ãªã£ããšãã«ã¢ããªã±ãŒã·ã§ã³ãã¯ã©ãã·ã¥ããããªã³ã«ãªãã®ãåŸ
ã€ããã«ã察å¿ããããžãã¯ãã¢ã¯ã¿ãŒã«ç»é²ããå¿
èŠããããŸãã RoomWriterã¢ã¯ã¿ãŒã¯ãæ°ããã¡ãã»ãŒãžãRoomReaderã«éç¥ããå¿
èŠããããŸãããã®ãããRoomReaderã®ã¹ããŒã¿ã¹ãç¥ãããšã¯æçã§ãã ãã®ããžãã¯ã¯ã2ã€ã®ç¶æ
ãã¢ã¯ã¿ãŒã«å°å
¥ããããšã§ãã説æãããŠããŸãã
RoomReaderã¯ã©ã¹ã®æ°ããã¡ãœãã ... sendIdentifyRequest() def sendIdentifyRequest(): Unit = { log.info(s"Trying connecting to $roomReaderPath") context.actorSelection(roomReaderPath) ! Identify(roomReaderPath) context.system.scheduler.scheduleOnce(3.seconds, self, ReceiveTimeout) } def receiveRecover = Actor.emptyBehavior def receiveCommand = identifying def identifying: Receive = { case msg: ChatMessage => persistAsync(msg) { m => log.info(s"Message $m persisted, but the reader isn't available") } case ActorIdentity(`roomReaderPath`, Some(actor)) => log.info(s"Successfully connected to $roomReaderPath") context.watch(actor) context.become(active(actor)) case ActorIdentity(`roomReaderPath`, None) => log.info(s"Remote actor is not available: $roomReaderPath") case ReceiveTimeout => sendIdentifyRequest() case _ => log.info("Not ready yet") } def active(reader: ActorRef): Receive = { case msg: ChatMessage => persistAsync(msg) { _ => reader ! Update } case "snap" => saveSnapshot("foo") case Terminated(`reader`) => log.info("reader terminated") sendIdentifyRequest() context.become(identifying) case ReceiveTimeout =>
sendIdentifyRequestã¡ãœããã§ã¯ãIdentifyã¡ãã»ãŒãžãéä¿¡ããããšã«ããããªã¢ãŒãã¢ã¯ã¿ãŒã®ActorRefãååŸããããšããŸãã ãã®ã¡ãã»ãŒãžã¯ãã¹ãŠã®ã¢ã¯ã¿ãŒã«ãã£ãŠç解ãããããã«å¿ããŠãç®çã®ActorRefãéä¿¡ããŸãã ActorRefãåãåã£ãåŸãéåžžã«æ»ã£ãŠäœæ¥ãéå§ããŸãã ãŸãããªã¢ãŒãã¢ã¯ã¿ãŒã®ã©ã€ããµã€ã¯ã«ã®ç£èŠãéå§ããã¢ã¯ã»ã¹ã§ããªãå Žåã¯ãå床ã¢ã¯ã»ã¹ãè©Šã¿ãŸãã
UserConnectionã¢ã¯ã¿ãŒã«åæ§ã®äœæ¥ããžãã¯ãå®è£
ããããã«ãããã¯ãšã³ããšéä¿¡ãããšãã«ä»²ä»è
ãšããŠæ©èœããå¥ã®ã¢ã¯ã¿ãŒãäœæããŸãã
BackendTalkerã¯ã©ã¹ã³ãŒã class BackendTalker(roomWriterPath: String, roomReaderPath: String) extends Actor with ActorLogging { import BackendTalker._ val listeners = collection.mutable.Set[ActorRef]() sendReaderIdentifyRequest() sendWriterIdentifyRequest() def sendReaderIdentifyRequest(): Unit = { log.info("sending identify request to reader") context.actorSelection(roomReaderPath) ! Identify(roomReaderPath) import context.dispatcher context.system.scheduler.scheduleOnce(3.seconds, self, ReaderReceiveTimeout) } def sendWriterIdentifyRequest(): Unit = { log.info("sending identify request to writer") context.actorSelection(roomWriterPath) ! Identify(roomWriterPath) import context.dispatcher context.system.scheduler.scheduleOnce(3.seconds, self, WriterReceiveTimeout) } def receive = identifying def identifying: Receive = { case ActorIdentity(`roomWriterPath`, Some(actor)) => log.info(s"Successfully identified writer at $roomWriterPath") context.watch(actor) context.become(waitingForReader(actor)) case ActorIdentity(`roomReaderPath`, Some(actor)) => log.info(s"Successfully identified reader at $roomReaderPath") listeners.foreach(actor ! Listen(_)) context.watch(actor) context.become(waitingForWriter(actor)) case ActorIdentity(path, None) => log.info(s"Remote actor not available: $path") case ReaderReceiveTimeout => sendReaderIdentifyRequest() case WriterReceiveTimeout => sendWriterIdentifyRequest() case msg: ChatMessage => listeners += context.watch(sender()) sender() ! ChatMessage("system", "Connection problem, try again later", System.currentTimeMillis()) case Terminated(userCon) => listeners -= userCon case _ => log.info("Not ready yet") } def waitingForReader(writer: ActorRef): Receive = { case ActorIdentity(`roomReaderPath`, Some(reader)) => log.info(s"Successfully identified reader at $roomReaderPath") listeners.foreach(reader ! Listen(_)) context.watch(reader) context.become(active(reader, writer)) case ActorIdentity(`roomReaderPath`, None) => log.info(s"Reader actor not available: $roomReaderPath") case ReaderReceiveTimeout => sendReaderIdentifyRequest() case WriterReceiveTimeout => sendWriterIdentifyRequest() case Terminated(`writer`) => log.info("writer terminated") sendWriterIdentifyRequest() context.become(identifying) case msg: ChatMessage => listeners += context.watch(sender()) sender() ! ChatMessage("system", "Connection problem, try again later", System.currentTimeMillis()) case Terminated(userCon) => listeners -= userCon case _ => log.info("Not ready yet") } def waitingForWriter(reader: ActorRef): Receive = { case ActorIdentity(`roomWriterPath`, Some(writer)) => log.info(s"Successfully identified writer at $roomWriterPath") context.watch(writer) context.become(active(reader, writer)) case ActorIdentity(`roomWriterPath`, None) => log.info(s"Reader actor not available: $roomWriterPath") case ReaderReceiveTimeout => sendReaderIdentifyRequest() case WriterReceiveTimeout => sendWriterIdentifyRequest() case Terminated(`reader`) => log.info("reader terminated") sendReaderIdentifyRequest() context.become(identifying) case msg: ChatMessage => listeners += context.watch(sender()) sender() ! ChatMessage("system", "Connection problem, try again later", System.currentTimeMillis()) case Terminated(userCon) => listeners -= userCon case _ => log.info("Not ready yet") } def active(reader: ActorRef, writer: ActorRef): Receive = { case l: Listen => reader ! l case msg: ChatMessage => writer ! msg case Terminated(`reader`) => log.info("reader terminated") sendReaderIdentifyRequest() context.become(waitingForReader(writer)) case Terminated(`writer`) => log.info("writer terminated") sendWriterIdentifyRequest() context.become(waitingForWriter(reader)) case ReaderReceiveTimeout => case WriterReceiveTimeout =>
ãã®äžã§ãRoomWriterã¢ã¯ã¿ãŒã§è¡ã£ãããšãšåæ§ã«ããªã¢ãŒãã¢ã¯ã¿ãŒãåŸ
ã€ããžãã¯ãå®è£
ããŸãã ãã®å Žåã2人ã®ä¿³åªãžã®æ¥ç¶ãäžåºŠã«æåŸ
ããå¿
èŠããããããäœæ¥ã®ããžãã¯ã¯å°ãè€éã§ãã
æåŸã®ä»äžãã¯æ®ããŸãããŠãŒã¶ãŒãåä¿¡ããã¡ãã»ãŒãžã®æ°ãå¶éããããã«ãRoomReaderãå°ãæžãæããŸãã
ãããè¡ãã«ã¯ã2ã3è¡ãè¿œå ããŸãã
ã³ã³ã¹ãã©ã¯ã¿ãŒã§ãã¡ãã»ãŒãžãä¿åããããã®ãããã¡ãŒãå®çŸ©ãããããæäœããããã®è£å©ã¡ãœãããäœæããŸãã ããã«ã10åã«1åãã¹ãããã·ã§ãããäœæããã³ãã³ããæäŸããã¹ã±ãžã¥ãŒã©ãŒãèµ·åããŸãã ã³ãã³ããã¢ã¯ã¿ãŒã«ã¡ãã»ãŒãžãéä¿¡ããããšã«ãã£ãŠäžããããsaveSnapshotã¡ãœãããçŽæ¥åŒã³åºããªãããšã«æ³šæããŠãã ããã ããã¯ãå€æŽå¯èœãªã¢ã¯ã¿ãŒããŒã¿ã®æäœã¯ã¢ã¯ã¿ãŒã«ãã£ãŠã®ã¿è¡ããããšããååã«éåããªãããã«ãæå³çã«è¡ãããŸãã ãã®ååã«éåãããšã埮åŠãªãã°ãçºçããå¯èœæ§ããããŸãã
RoomReaderãžã®è¿œå context.system.scheduler.schedule(tenMinutes, tenMinutes, self, Snap) val state = FixedTimeMessageBuffer(tenMinutes) def updateState(msg: ChatMessage) = state.append(msg)
receiveã¡ãœããã§ã¯ãç¹å¥ãªã¡ãã»ãŒãžãå°çãããšãã«ã¹ãããã·ã§ãããä¿åããæ©èœãå®è£
ããŸãã ãŸããã¹ãããã·ã§ããããã®ç¶æ
ã®æ£ãã埩å
ãå®è£
ããŸãã
RoomReaderãžã®è¿œå case msg:ChatMessage => updateState(msg) sendAll(msg) case Listen(ref) => listeners add context.watch(ref) state.foreach(ref ! _) case Snap => saveSnapshot(state.toList) case SnapshotOffer(_, snapshot: List[ChatMessage]) => state.replace(snapshot)
èŠçŽãããšããªã¢ã¯ãã£ãããã°ã©ãã³ã°ã®ç²Ÿç¥ã§äœãããææ°ã®Webã¢ããªã±ãŒã·ã§ã³ãå®è£
ãããšèšããŸãã ããã«ããããŠãŒã¶ãŒã®èŠæ±ã«è¿
éã«å¯Ÿå¿ã§ããããçšåºŠã®å®å®æ§ãåŸãããŸãã ãã ããæ¹åãã¹ãç¹ã¯ãããããããŸãã , , akka-cluster, , . , - , . akka-streams. .