ããŸããŸãªãããžã§ã¯ãããµããŒãããéçšã§ã Promise
äžé©åãªäœæ¥ã®ããã«çç£Promise
åé¡ãçºçããç¶æ³ã«äœåºŠãééããŸããã ããã«ããã®éåžžã«ééã£ãäœæ¥ã®ãã¿ãŒã³ã¯åžžã«åãã§ããããç°ãªãè£
ãã«é ããŠããŸããã ããã«ã誀ã£ãã³ãŒãã¯ããŸããŸãªäººã
ã«ãã£ãŠæžãããŸããã ããã«ã Promise
ã§ã®äœæ¥ã«é¢ããèšäºã§ã¯ã匷調ãããåé¡ã®èšåãæ¬åœã«èŠã€ããŸããã ã ããç§ã¯å€ãã®äººãç§ã話ãåé¡ãå¿ããŠãããšæããŸãã
çŽæãæªæ¥ããããŠåœ¹è
ãšãšãã«ãScalaã§éåæã³ãŒãã®å€ãã®äŸãèªãã®ã¯é¢çœãã§ããïŒ ç«ãžããããïŒ
æªæ¥ã«ã€ããŠå°ã
æå§ãã«ã Future
ã«ã€ããŠã®ã¡ãã£ãšããçè«ã玹ä»ããŸãã Scalaæšæºã©ã€ãã©ãªã®ãã®ã¿ã€ãã«ç²ŸéããŠããå Žåããã®éšåã¯å®å
šã«ã¹ãããã§ããŸãã
Scalaã¯Future[T]
åã䜿çšããŠéåæèšç®ã衚ããŸãã DBMSããããŒå€ãæœåºããå¿
èŠããããšããŸãã ãã®ãããªãªã¯ãšã¹ãã®åæé¢æ°ã®çœ²åã¯æ¬¡ã®ããã«ãªããŸãã
trait SyncKVStore[K, V] { def get(key: K): V }
ãã®åŸã次ã®ããã«äœ¿çšã§ããŸãã
val store: SyncKVStore[String, String] = ... val value = store.get("1234")
ãã®ã¢ãããŒãã«ã¯æ¬ ç¹ããããŸãïŒ get()
ã¡ãœããã¯ãããã¯ããå¯èœæ§ããããæ¯èŒçé·ãæéãããã¯ãŒã¯äžã§ããŒã¿ãéä¿¡ãããå¯èœæ§ãããããã§ãã ãã³ããããã³ã°ã«ããããšãããšãç¹æ§ã¯æ¬¡ã®ããã«ãªããŸãã
import scala.concurrent.Future trait KVStore[K, V] { def get(key: K): Future[V] }
å®è£
ãæ£ããæžãæããã«ã¯ã䜿çšããDBMSã®éåæãã©ã€ããŒã䜿çšããå¿
èŠãããå¯èœæ§ããããŸãã äŸãšããŠããããäžã«ã¡ã¢ãªå
å®è£
ãèšè¿°ããŸãã
import scala.concurrent.ExecutionContext class DummyKVStore(implicit ec: ExecutionContext) extends KVStore[String, String] {
ååŸããå€ã¯ãããšãã°æ¬¡ã®ããã«äœ¿çšã§ããŸãã
Futureã«ã¯ãå€ãåŠçããããã®äŸ¿å©ãªéåæã¡ãœãããããã€ããããŸã;ãããã®ããã€ããç°¡åã«èª¬æããŸãã
map(f: A => B)
-å
ç©ã®çµããã«ã颿°f
ã䜿çšããŠæåããçµæã倿ããŸãflatMap(f: A => Future[B])
-æåããçµæã倿ããŸãããå®éã«ã¯éåæé¢æ°ãåãå
¥ããŸãforeach(f: A => Unit)
-æ£åžžã«å®äºãããšã颿°f
å®è¡ããå
ç©ã®çµæãæž¡ããŸãonComplete(f: Try[A] => Unit)
ãšåãã§ãããä»»æã®å®äºæã«æ©èœãå®è¡ããŸãã ãšã©ãŒãã
ååããæšæž¬ã§ããããã«ãããããã¹ãŠã®ã¡ãœããã¯implicit ec: ExecutionContext
ãã©ã¡ãŒã¿ãŒã䜿çšããããšãæç¢ºã«ããããšã䟡å€ããããŸããããã«ã¯ãå
ââç©ã®å®è¡ã®ã³ã³ããã¹ãã«é¢ããæ
å ±ãå«ãŸããŸãã
å
ç©ã®è©³çްã«ã€ããŠã¯ãäŸãã°ãã¡ããã芧ãã ãã ã
çŽæïŒçè«ã®ããã«2ã€ã®æ®µèœ
Promise
ãšã¯äœã§ããïŒ å®éãããã¯å
ç©ãå«ãåæå®ããã远èšåã³ã³ããã§ãã
val p = Promise[Int]() p.success(42) p.future
å
ç©ãå®äºããããã®å€ãã®æ¹æ³ããããŸããäŸãã°ïŒ
success(t: T)
-çµæt
ãã¥ãŒãã£ãŒãçµäºããŸãfailure(e: Throwable)
e
å
ç©ããã¡ã€ã«ããcomplete(try: Try[T])
- try
ãSuccess
å Žåãå
ç©ãçµäºããŸãã 倱æããã倱æcompleteWith(future: Future[T])
- future
çµäºãããšãã«å
éšfuture
å®äºããŸã
ãããã£ãŠãPromiseã¯å
ç©ãäœæããããã«äœ¿çšã§ããŸãã
çŽæïŒäœ¿çšã®å®è·µã«æ²¡é ãã
ã³ãŒã«ããã¯ã§æ¢è£œã®éåæJava APIã®äžã«ç¬èªã®éåæAPIãå®è£
ããããšãæ³åããŠãã ããã å
ç©ã§è¿ãããçµæã¯ã³ãŒã«ããã¯ã§ã®ã¿äœ¿çšã§ããããã Future.apply()
ã¡ãœãããçŽæ¥äœ¿çšããããšã¯ã§ããŸããã ããã§ã Promise
ã圹ç«ã¡ãŸãã SOãžã®ãã®çãã«ã¯ãå®äžçã§Promise
ã䜿çšããäžèŠçŽ æŽãããäŸããããŸãã
def makeHTTPCall(request: Request): Future[Response] = { val p = Promise[Response]() registerOnCompleteCallback { buffer => val response = makeResponse(buffer) p.success(response) } p.future }
ããŠãAkka-HTTPãªã©ã®æ°ããWebãµãŒãã¹ã§ãã®é¢æ°ã䜿çšããŸãã éå§ããã«ã¯ãSBTã§äŸåé¢ä¿ãæ¥ç¶ããŸãã
libraryDependencies += "com.typesafe.akka" %% "akka-http" % "10.0.10"
ãããŠããµãŒãã¹ã³ãŒããèšè¿°ããŸãã
import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.marshalling.GenericMarshallers.futureMarshaller import scala.concurrent.ExecutionContext.Implicits.global object WebService extends App {
泚ïŒAkka-HTTPã®complete()
ã¡ãœããã¯Future[T]
åãå
¥ããããšãã§ãããã®ããã«futureMarshaller
ã€ã³ããŒããããŸãã å
ç©ã®å®äºåŸã«HTTPãªã¯ãšã¹ãã«å¿çããŸãã
ãŸããHTTP APIãä»ããŠããŒã¿ããŒã¹ããã®ãã¹ãŠã®ã¡ãŒã«ã«ç¹å®ã®ããŒã§å€ãéä¿¡ããã¿ã¹ã¯ãåè«ã«ããããšã決å®ããŸããã ãããŠã圌ã¯ãµã€ã¯ã«ã§ãããããŸãïŒé
åžã®å®äºåŸããã¹ãŠã®ã¯ã©ã€ã¢ã³ãã¯åã³ãããããå§ããŸãã
def allEmails: Seq[String] = ... def sendEmails(implicit ec: ExecutionContext): Future[Unit] = { Future.sequence { for { email <- allEmails } yield for { value <- store.get("42") response <- makeHTTPCall(new SendMailRequest(email, value)) } yield response }.flatMap(_ => sendEmails)
ãã¹ãŠãéçšç°å¢ã«é
眮ããŸãã ãã ããæ°æ¥åŸãAPIã³ã³ã·ã§ã«ãžã¥ãç§ãã¡ã®ãšããã«æ¥ãŠãã¿ã€ã ã¢ãŠãã«ãããµãŒãã¹ã®å®æçãªããŒã«ã¢ãŠãã«ã€ããŠèŠæ
ãç³ãç«ãŠãŸãã ãããŠ3æ¥åŸãã¿ã¹ã¯ãã¡ãŒã«ã®éä¿¡ã忢ããããšãããããŸããïŒ åé¡ã¯äœã§ããïŒ
ãã°ã«ã¯ã次ã®ãã¬ãŒã¹ããããŸãã
some.package.SomeException at some.package.makeResponse(...) at some.package.$anonfun$makeHTTPCall$1(...) ...
makeResponse()
ã¡ãœãããå®è¡ãmakeResponse()
ããããšãmakeResponse()
ã makeHTTPCall()
ãœãŒã¹ãèŠããšããã®å Žåããããè¿ãå
ç©ã¯æ±ºããŠçµãããªãããšãããããŸãïŒ
val p = Promise[Response] registerOnCompleteCallback(buffer => { val response = makeResponse(buffer)
ããããã¿ã€ã ã¢ãŠãã«ãã£ãŠAPIãèœã¡ãã¡ãŒã«éä¿¡ãµã€ã¯ã«ãæ©èœããªããªã£ãçç±ã§ãã æ®å¿µãªãããScalaã§ã¯ã å€ãã®äººãæãããã«ãã©ã®é¢æ°ãå®è¡ãè¿ãããšãã§ãããšèããã«ããã°ã©ãã³ã°ããããšã¯ã§ããŸãã ...
ãã®ããã Try.apply()
ã¡ãœããã¯å®è¡ãã€ã³ã¿ãŒã»ããããå€ã§Success
ãè¿ãããäŸå€ãã¹ããŒããŠFailure
ãè¿ãããšãã§ããããšãæãåºããŠTry.apply()
ã ã©ã ãæ¬äœãåçŽãªæ¹æ³ã§ä¿®æ£ããã³ãŒãã«ã¬ãã¥ãŒãéä¿¡ããŸãã
import scala.util._ Try(makeResponse(buffer)) match { case Success(r) => p.success(r) case Failure(e) => p.failure(e) }
ãã ããã¬ãã¥ãŒã§ã¯ãpromiseã«ã¯complete()
ã¡ãœããããããããèªäœãæã§æžãããã®ãšåãããšãè¡ãããšãããããŸãã
p.complete(Try(makeResponse(buffer))
Promise
ã«ã€ããŠåŠãã ããšïŒ
Promise
ã¡ãœããã®æåã«å®£èšããããã®futuresãæåŸã«è¿ãããå Žåãããã¯ãã®futuresãçµäºããããšãæå³ãããã®ã§ã¯ãããŸãããPromise
ãåžžã«éããªããã°ãªããªããªãœãŒã¹ãšèãããšäŸ¿å©ã§ãã ãã ãããããã¹ã¯éåžžãç°ãªãã¹ã¬ããã§å®£èšãããŠéãããããããæšæºèšèªæ§æïŒ try-finally
ïŒãŸãã¯ã©ã€ãã©ãªïŒ scala-arm
ïŒã䜿çšããŠãªãœãŒã¹ãšããŠäœ¿çšããããšã«ã¯åé¡ããããŸãã
ãããã誰ãããããã¯äººçºçãªäŸã§ãããå®ç掻ã§ã¯èª°ãçŽæãå¿ããããšãå¿ããªããšèšãã§ããããïŒ ãŸãããã®ãããªæçè«è
ã®ããã«ãç§ã¯Akkaã®ããã€ãã®æ¬åœã®ãã°/ PRã®åœ¢ã§çããæã£ãŠããŸãã
- QueueSourceã¯ãçªç¶ã®çµäºæã«onComplete futureãå®äºããŸãã
- IgnoreSinkã¯ãçªç¶ã®çµäºæã«mat futureãå®äºããŸãã
- å®äºãããã¥ãŒã®ãªãã¡ãŒãåŒã³åºããšã決ããŠå®äºã§ããªãæªæ¥ãè¿ãããŸã
ããã«ããã¹ãŠããã®äŸã®ããã«åçŽã§æçœã§ãããšã¯éããŸããã
çè«ã®æåŸã®éšåïŒä¿³åªãžã®å°ããªç޹ä»
ã¢ã¯ã¿ãŒã«ç²ŸéããŠãã人ã¯ããã®ããŒããã¹ãããã§ããŸãã
ãã®èšäºã®åŸåã§ãAkkaã¢ã¯ã¿ãŒã«ã€ããŠå°ãç¥èãå¿
èŠã«ãªããŸãã akka-actorã¢ãžã¥ãŒã«ããããžã§ã¯ãã«æ¥ç¶ããŸãïŒSBTã®äŸïŒïŒ
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.7"
Akkaã®ã¢ã¯ã¿ãŒã¯ãéåæçã«ã¡ãã»ãŒãžãåä¿¡ããããã€ãã®åäœãæã€ãªããžã§ã¯ãã§ãã ããã©ã«ãã§ã¯ãåäœã¯receive
ã¡ãœããã§æ±ºå®ããreceive
ã
import akka.actor._ import akka.actor.Actor.Receive val log: Logger = ourLogObject() case object HelloRequest class HelloActor extends Actor {
äœæåŸãã¢ã¯ã¿ãŒã¯çŽæ¥ã¢ã¯ã»ã¹ã§ããŸãããã ActorRef
ãšåŒã°ãããããã·ãä»ããŠã¢ã¯ã»ã¹ã§ããŸãã ãã®ãããã·ãä»ããŠãã¡ãœããã䜿çšããŠéåæã§ã¡ãã»ãŒãžãéä¿¡ã§ããŸã!
ïŒ tell
ãšã€ãªã¢ã¹ïŒããããã®ã¡ãã»ãŒãžã¯ãã¢ã¯ã¿ãŒã®åäœã決å®ããã¡ãœããã«ãã£ãŠåŠçãããŸãã ã¡ãã»ãŒãžã¯ã·ãªã¢ã©ã€ãºå¯èœã§ãªããã°ãªããªããããå€ãã®case object
ãã¡ãã»ãŒãžçšã®case object
äœæãããŸãïŒã¡ãã»ãŒãžã®ãã©ã¡ãŒã¿ãŒããªãcase class
ïŒã ã¢ã¯ã¿ãŒã¯äžåºŠã«1ã€ã®ã¡ãã»ãŒãžããåŠçã§ããªããããåæããªããã£ããšããŠã䜿çšã§ããŸãã
ãã1ã€ã®éèŠãªãã€ã³ãããããŸããã¢ã¯ã¿ãŒã¯ãã®åäœæ©èœã倿Žã§ãããã€ãŸãå®éã«ã¡ãã»ãŒãžãåŠçã§ããŸãã ãããè¡ãã«ã¯ãã¢ã¯ã¿ãŒã¯context.become(newReceive)
ã¡ãœãããåŒã³åºãå¿
èŠããããŸãnewReceive
ã¯Receive
ãã©ã¡ãŒã¿ãŒã§ãã ãã®åŸã次ã®ã¡ãã»ãŒãžããéå§ããŠãããã©ã«ãã®receive
代ããã«newReceive
颿°ã«ããåŠçãéå§ãããŸãã
ãã¿ãŒã³ã®åéšåãæ¥ç¶ããïŒçŽæã俳åªã«æž¡ã
ããã§ã¯ã次ã®äŸã«ç§»ããŸãããã
äœããã®ãµãŒãã¹ã®ã¯ã©ã€ã¢ã³ããäœæããå¿
èŠããããŸãã ããšãã°ãäºçŽçšã idã§ããã«æ
å ±ãåä¿¡ã§ããããã«ããããšããŸãã
case class Hotel(id: Long, name: String, country: String) // trait BookingClient { def getHotel(id: Long): Future[Hotel] }
次ã«ãäºçŽAPIã«ã¢ã¯ã»ã¹ããŠå¿çãåŠçããã¡ãœãããå®çŸ©ããå¿
èŠããããŸãã ãããè¡ãã«ã¯ãAkka-HTTPã©ã€ãã©ãªã®éåæHTTPã¯ã©ã€ã¢ã³ãã䜿çšããŸãã äŸåé¢ä¿ã§æ¥ç¶ããŸãïŒ
libraryDependencies ++= "com.typesafe.akka" %% "akka-http" % "10.0.10"
圌ãã¯ãæ¯èŒç倧ããªRPSã§çæéã¡ãœãããå®è¡ãããã®ã§ãããå¿çæéã¯ããã»ã©éèŠã§ã¯ãããŸããã Akka-HTTPã¯ã©ã€ã¢ã³ãã«ã¯ç¹ç°æ§ããããŸãïŒ akka.http.host-connection-pool.max-connectionsèŠæ±ä»¥äžã®äžŠåå®è¡ãèš±å¯ããŸãã ã éåžžã«åçŽãªãœãªã¥ãŒã·ã§ã³ã䜿çšããŸãããã¹ãŠã®ãªã¯ãšã¹ããã¢ã¯ã¿ãŒãééããããã«ããŸããã€ãŸãã1ã€ã®ã¹ããªãŒã ã«ãªããŸãïŒå®éã®ãœãªã¥ãŒã·ã§ã³ã¯ããå°ãè€éã§ããããããã¯ãã®äŸã§ã¯éèŠã§ã¯ãããŸããïŒã å
ç©ãè¿ãããã®ã§ãçŽæãäœæããŠã¢ã¯ã¿ãŒã«æž¡ããŸãããã§ã«ã¢ã¯ã¿ãŒã§ã³ãããããŸãã
ç¹°ãè¿ãã«ãªããŸããããã¹ãŠãåºããåŸãæåã¯ãã¹ãŠããŸããããŸããããã getHotel()
Method Returns Incomplete FuturesããšããèŠåºãã®ãã°ã¬ããŒãããããŸããã ãªããããèµ·ãã£ãã®ã§ããïŒ ç§ãã¡å
šå¡ãæ³åããããã«èŠããŸãããã©ã ãæ¬äœå
šäœã«å¯ŸããŠcompleteWith()
ã¡ãœããã䜿çšããŸãã...ããã«ãããããããç¹å®ã®æ¡ä»¶äžã§ã¯ãå
ç©ã¯ãŸã æ®ã£ãŠããŸãã
åé¡ã¯ã foreach()
ã¡ãœããã«æž¡ãããã©ã ãã¯ãæçµçãªeventual
æ£åžžã«å®äºãããšãã«ã®ã¿éå§ãããããšã§ãã ãããã£ãŠããã®å
ç©ã倱æããå ŽåïŒããšãã°ãããããèœã¡ãå ŽåïŒãçŽæã¯æ±ºããŠneverããããŸããïŒ
ä¿®æ£ã¯æ¯èŒçç°¡åã§ãããšæ³å®ã§ããŸãonComplete()
代ããã«onComplete()
ã䜿çšããæž¡ãããã©ã ãã®ãšã©ãŒãåŠçããå¿
èŠããããŸãã ãã®ãããªãã®ïŒ
eventual.onComplete { case Success(response) =>
ããã«ãããç¹ã«ãã¥ãŒãžã§ã³ã®ãã¥ãŒãžã§ã³ã«ãããã¥ãŒãã£ãŒå
ç©ã®åé¡ã解決ãããŸããããã®æ¹æ³ã§éä¿¡ããããã¥ãŒãã£ãŒå
ç©ã®åé¡ããã¹ãŠè§£æ±ºã§ããããã§ã¯ãããŸããã
ãããªãæšè«ãåçŽåããããã«ãå°æ¥ãå®äºããããã«ã¢ã¯ã¿ãŒã«çŽæã転éãããããåçŽãªãåæã«ããäžè¬çãªäŸãå®è£
ããŸãã
case class Request(arg: String, p: Promise[String]) trait GimmeActor { val actor: ActorRef def function(arg: String): Future[String] = { val p = Promise[String]() actor ! Request(arg, p) p.future } }
ã¡ãªã¿ã«ãããã£funcion()
ã®æ§ç¯ã¯ãããšãã°Akkaãä»ã®ã©ã€ãã©ãªã®ãœãŒã¹ã³ãŒãã«ããèŠãããŸãã åãAkkaã«ã¯ããã®ãã¿ãŒã³ã«åŸã£ãŠæžãããPromise
æ°åã®äœ¿çšæ³ããããŸãã
Promise
äœæ- éåæé¢æ°ãžã®åŒæ°ã®1ã€ãšããŠæž¡ããŸãïŒAkkaã®ãœãŒã¹ã§ã¯ãããã¯å€ãã®å Žåãåã«
actor.tell()
åŒã³åºãã§ãïŒ Promise
ã«ãã£ãŠäœæãããfuture
ãã£ãŒã«ããè¿ããŸãã
æç¢ºã«ããããã®ããã€ãã®äŸïŒ
- ããã§ã¯ãL129ã®ã³ãŒã«ããã¯ã§ã
p
ããã¢ã¯ã¿ãŒãžã®è»¢éã®ã¿ãè¡ãããŸãã - ãããŠããã§ãã¢ã¯ã¿ãŒãäœæããããã®ãã©ã¡ãŒã¿ãŒã«ãããã¹ãçŽæ¥éä¿¡ãããŸãã
ãã®ãã¿ãŒã³ã®äœ¿çšã«ã¯ãå°ãªããšãããã€ãã®åé¡ããããŸãã
- 2ã€ã®ãµã³ãã«ã¯ã©ã¹ïŒ
class DoesntEvenKnowAboutRequest extends Actor { def receive: Receive = { case PoisonPill => } } class HandlesRequestWrongWay extends Actor { def receive: Receive = { case Request(arg, p) => } }
ã©ã¡ãã®å ŽåããçŽæã¯æããã«å®äºããŸããã
äžæ¹ã§ã¯ããããã®äŸã¯ããŸãã«ãåæçã§ãããšèšããŸãã äžæ¹ãã³ã³ãã€ã©ã¯ãã®ãããªãšã©ãŒããç§ãã¡ãä¿è·ããŸããã ããã«ããã®åé¡ã¯ã¢ã¯ã¿ãŒã ãã§ãªããPromiseãéåžžã®éåæé¢æ°ã«è»¢éããå Žåã«ãèŠããŠããå¿
èŠããããŸãã
- ã¢ã¯ã¿ãŒã®å®è£
ãæ£ããèšè¿°ãããªã¯ãšã¹ããåãåã£ããšãã«åžžã«çŽæãå®äºãããŸãã
class AlwaysCompletesRequest extends Actor { def receive: Receive = { case Request(arg, p) => p.success("42") }
ç§ãã¡ã¯ãã¹ãŠé 調ã§ããã®ã¢ã¯ã¿ãŒã䜿çšãããµãŒãã¹ã¯å®ç§ã«æ©èœããŸãã ãã ããæåã«1ã€ã®ãµãŒããŒã§ãµãŒãã¹ãéå§ããããšã«ããŸããã
ããããä»ã§ã¯äŒç€Ÿãæé·ããŠããããŠãŒã¶ãŒã®æ°ãå¢ããŠããŸããããã«äŒŽããã¢ã¯ã¿ãŒãžã®ãªã¯ãšã¹ãã®æ°ãå¢ããŠããŸãã è² è·ã®ããŒã¯æã«ãã¢ã¯ã¿ãŒã¯å¿
èŠãªæéå
ã«ã¡ãã»ãŒãžãããŒã«å¯ŸåŠããæéããªããªã£ããããæ°Žå¹³ã¹ã±ãŒãªã³ã°ãè¡ãããšã«ããŸãããã¯ã©ã¹ã¿ãŒå
ã®ç°ãªãããŒãã§AlwaysCompletesRequest
ãå®è¡ããŸãã ã¯ã©ã¹ã¿ãŒãæŽçããã«ã¯ãakka-clusterã䜿çšããå¿
èŠããããŸãããç°¡åã«ããããã«ã¯ã©ã¹ã¿ãŒãæŽçããã1ã€ã®ãªã¢ãŒãã¢ã¯ã¿ãŒAlwaysCompletesRequest
èŠãŠAlwaysCompletesRequest
ã
äž¡æ¹ã®JVMã§akka-remoteããµããŒãããActorSystem
ãäœæããå¿
èŠããããŸãïŒã¢ã¯ã¿ãŒãžã®ã¢ã¯ã»ã¹ãšãã¹ãã ãããè¡ãã«ã¯application.conf
äž¡æ¹ã®ãµãŒãã¹ã®application.conf
æ¬¡ã®æ§æã远å ããŸãã
akka { actor { provider = remote } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "some-hostname" port = 2552 # ; - } } }
SBTã®äŸã®ããã«ãäž¡æ¹ã®ãµãŒãã¹ã«akka-remoteäŸåé¢ä¿ã远å ããå¿
èŠããããŸãã
libraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.5.7"
ãµãŒããŒäžã§ã¢ã¯ã¿ãŒãäœæããŸãïŒ
val system = ActorSystem("server") system.actorOf(Props(new AlwaysCompletesRequest), "always-complete-actor")
次ã«ãã¯ã©ã€ã¢ã³ãã§ååŸããŸãã
val system = ActorSystem("client") val actor = system.actorSelection("akka.tcp://server@some-hostname:2552/user/promise-actor") val gimme = new GimmeActor { override val actor: ActorRef = actor } val future = gimme.function("give me 42, please!")
ãããŠãã¯ã©ã€ã¢ã³ããšãšãã«ãµãŒããŒãèµ·åããŸãã...ãããŠããã«è¡ãè©°ãŸããŸããã
ãã°ã«æ¬¡ã®äŸå€ããããŸãã
akka.remote.MessageSerializer$SerializationException: Failed to serialize remote message [class akka.actor.ActorSelectionMessage] using serializer [class akka.remote.serialization.MessageContainerSerializer]. Caused by: java.io.NotSerializableException: scala.concurrent.impl.CallbackRunnable
ãããã£ãŠããªã¢ãŒãã¢ã¯ã¿ãŒã«ãããã¹ãéä¿¡ããããšãããšããããã¹ã®ã·ãªã¢ã«åãšã©ãŒãéåžžã«äºæ³ã©ããã«çºçããŸããã å®éãããšãã·ãªã¢ã«åããŠçŽæãæž¡ãããšãã§ãããšããŠãããªã¢ãŒãJVMã§ãžã£ã ã«ãªãã ãã§ãJVMã§ã¯ã¹ã¿ãã¯ãããŸãŸã«ãªããŸãã ãããã£ãŠãã¢ã¯ã¿ãŒã«ãããã¹ã転éããããšã¯ããŒã«ã«ã¡ãã»ãŒãžã³ã°ã§ã®ã¿æ©èœããŸããã€ãŸããã¢ã¯ã¿ãŒã«ãããã¹ãéä¿¡ãããã®ãã¿ãŒã³ã¯ããŸãã¹ã±ãŒãªã³ã°ããŸããã
ãã¿ãŒã³ã®åé¡ã解決ããããã®ãªãã·ã§ã³
ã¿ã€ã ã¢ãŠãå®äº
åé¡ã解決ããæãæçœãªæ¹æ³ã¯ãã¿ã€ã ã¢ãŠãä»ãã§ãããã¹ãæåºããããšã§ãã Akkaã䜿çšãããšãããšãã°æ¬¡ã®ããã«ãããè¡ãããšãã§ããŸãã
åã«ç¢ºèªããPromise
ã¡ãœããã¯ã Promise
ãŸã å®äºãPromise
ããªãå Žåã«ã®ã¿æåããŸãã ãããã®ã¡ãœããããããã¹ã®å®äºåŸã«åŒã³åºããããšããããã¯IllegalStateException
ãã¹ããŒããŸãã çŽæããã§ã«å®äºããŠããå¯èœæ§ãããå Žåã«ãçŽæã®å®äºã詊ã¿ãå¿
èŠãããå Žåãèãããããã®ãšäŒŒãæ¹æ³ããããŸãããååã«try
ãã¬ãã£ãã¯ã¹ããããŸãã çŽæãèªåã§å®äºããå Žåã true
ãè¿ãtrue
ã ãã®ã¡ãœãããåŒã³åºãåã«ãããã¹ãå®äºããå Žåã¯false
ã
ãŸãããªãã·ã§ã³ãšããŠãã¢ã¯ã¿ãŒå
ã§çŽæ¥ã¿ã€ã ã¢ãŠãããããšã§å
ç©ã倱æãããããšãã§ããŸãã
class AlwaysAlwaysCompletesRequest extends Actor { def receive: Receive = { case Request(str, p) => p.completeWith(someFuture()) context.scheduler.scheduleOnce(timeout)(self ! Timeout(p)) case Timeout(p) => p.tryFailure(new SomeTimeoutException) } }
ãã¡ããããã®ãªãã·ã§ã³ã¯ã¹ã±ãŒã©ããªãã£ã®åé¡ã解決ããŸããã
質åãã¿ãŒã³
ãã¡ãããå¥ã®æ¹æ³ã§è¡ãããšãã§ããŸãã ããšãã°ãã¿ã€ã ã¢ãŠããå¿
èŠãªaskãã¿ãŒã³ã䜿çšããŸãã
import akka.pattern.ask import akka.util.Timeout case class Request(arg: String) // def function(arg: String): Future[String] = { implicit val timeout = Timeout(2.seconds) val any: Future[Any] = actor ? Request(arg)
ãã®å Žåãã¢ã¯ã¿ãŒã®å®è£
ã¯ãããã«ç°ãªãå¿
èŠããããŸããçŽæãå®äºãã代ããã«ãã¡ãã»ãŒãžã«è¿ä¿¡ããå¿
èŠããããŸãã
class AlwaysCompletesRequest extends Actor { def receive: Receive = { case Request(arg) =>
ãã ããå®è£
ã®å®¹æãã«ã¯å±éºã䌎ããŸãã
- çŸåšãã¢ã¯ã¿ãŒããçµäºãããå
ç©ã¯ã¿ã€ãã»ãŒãã§ã¯ãããŸããã ééã£ãåã®å€ã§ãããã¹ãã¯ããŒãºããããšãããšãã³ã³ãã€ã©ãŒã¯æãã€ããããšãã§ããŸãã ãã ãã
Any
ããã®å倿ã®å Žåã¯ä¿è·ãããŸããã - ãã®è°è«ã¯ãã䞻芳çã«èŠãããããããŸããããç§ã¯ããã«å察ããããšã¯ã§ããŸããã
tell
ã䜿çšããèšèštell
ãéåžžã俳åªã¢ãã«ã«ãšã£ãŠçè§£ãããããšèããããŠããŸãã ask
" " , akka-remote, , "" ActorRef
(, remote, ). Akka tell
, ask
, .
Akka Typed
, ask- , , . Akka ask- Future[Any]
. , , :
(actor ? Request(arg)).mapTo[String]
, , String
, . Akka Typed. :
libraryDependencies += "com.typesafe.akka" %% "akka-typed" % "2.5.7"
, :
import akka.typed._ import akka.typed.scaladsl.Actor import akka.typed.scaladsl.AskPattern._ import akka.util.Timeout import scala.concurrent.Future import scala.concurrent.duration._ case class Request(arg: String, replyTo: ActorRef[String]) // val typeSafeActor = Actor.immutable[Request] { (_, msg) => msg.replyTo ! "42"
. , , 2. , - API " ".
, .. ,
, , - , ActorSystem
. :
case class Request(id: Long) case class Response(hotel: Hotel) // API class CallingActor(actor: ActorRef) extends Actor { def receive: Receive = { case response: String => doSomethingWithActorResponse(response) case request: Request => actor ! request
, . API , .
,
Akka , . , , , : 100%-, 100%- , , . , , . , , , .
, , , . , . , :
override def receive: Receive = { case Request(id, p) => p.completeWith(doRequest(id)) p.future.onComplete(self ! Completed) context.become(running) } def doRequest(id: Long): Future[Hotel] = { val uri = Uri(baseUri).withQuery(Query("id" -> id.toString)) val eventual = Http().singleRequest(HttpRequest(Uri(uri = uri))) eventual.flatMap { response => val unmarshalled = Unmarshal(response.entity) response.code match { case StatusCodes.OK => unmarshalled.to[Hotel] case _ => Future.failed(new Exception(unmarshalled.to[String])) } } }
:
- ;
- API
flatMap
, onComplete
; - - Akka-HTTP.
, actor.tell()
.
:
- â , JVM.
- ask/tell â Akka, .
- :
+
â ;
± â , , , ; ;
-
â . - â ( ).
- â API, typed ask.
- API â : , , .
| | ask | typed ask | | promise |
---|
| - | + | + | + | - |
ask/tell | tell | ask | ask | tell | tell |
| ± | - | + | ± | ± |
| + | + | + | + | - |
| + | + | - | + | + |
API | + | + | + | - | + |
, , .
, , , , . , , :
- , ;
- , , , ;
- .
, .
- , !