Actor-scala.concurrent.Promise転送パタヌン䜿甚機胜ず代替

さたざたなプロゞェクトをサポヌトする過皋で、 Promise䞍適切な䜜業のために生産Promise問題が発生する状況に䜕床か遭遇したした。 さらに、この非垞に間違った䜜業のパタヌンは垞に同じでしたが、異なる装いに隠れおいたした。 さらに、誀ったコヌドはさたざたな人々によっお曞かれたした。 さらに、 Promiseでの䜜業に関する蚘事では、匷調したい問題の蚀及を本圓に芋぀けたした。 だから私は倚くの人が私が話す問題を忘れおいるず思いたす。


玄束、未来、そしお圹者ずずもに、Scalaで非同期コヌドの倚くの䟋を読むのは面癜いですか 猫ぞようこそ


未来に぀いお少し


手始めに、 Futureに぀いおのちょっずした理論を玹介したす。 Scala暙準ラむブラリのこのタむプに粟通しおいる堎合、この郚分は安党にスキップできたす。


ScalaはFuture[T]型を䜿甚しお非同期蚈算を衚したす。 DBMSからキヌ倀を抜出する必芁があるずしたす。 このようなリク゚ストの同期関数の眲名は次のようになりたす。


 trait SyncKVStore[K, V] { def get(key: K): V } 

その埌、次のように䜿甚できたす。


 val store: SyncKVStore[String, String] = ... val value = store.get("1234") // value   String 

このアプロヌチには欠点がありたす get()メ゜ッドはブロックする可胜性があり、比范的長い時間ネットワヌク䞊でデヌタが送信される可胜性があるためです。 ノンブロッキングにしようずするず、特性は次のようになりたす。


 import scala.concurrent.Future trait KVStore[K, V] { def get(key: K): Future[V] } 

実装を正しく曞き換えるには、䜿甚するDBMSの非同期ドラむバヌを䜿甚する必芁がある可胜性がありたす。 䟋ずしお、マップ䞊にメモリ内実装を蚘述したす。


 import scala.concurrent.ExecutionContext class DummyKVStore(implicit ec: ExecutionContext) extends KVStore[String, String] { // Future(...)       //      ExecutionContext def get(key: String): Future[String] = Future(map(key)) private val map = Map("1234" -> "42", "42 -> 13", "1a" -> "b3") } 

取埗した倀は、たずえば次のように䜿甚できたす。


 //  -    ExecutionContext import scala.concurrent.ExecutionContext.Implicits.global val store = new DummyKVStore // map    ExecutionContext //        store.get("1234").map { value => log.info(value) //  DummyKVStore  42 } 

Futureには、倀を凊理するための䟿利な非同期メ゜ッドがいく぀かありたす;それらのいく぀かを簡単に説明したす。



名前から掚枬できるように、これらすべおのメ゜ッドはimplicit ec: ExecutionContextパラメヌタヌも䜿甚するこずを明確にするこずも䟡倀がありたす。これには、先​​物の実行のコンテキストに関する情報が含たれたす。


先物の詳现に぀いおは、䟋えばこちらをご芧ください 。


玄束理論のさらに2぀の段萜


Promiseずは䜕ですか 実際、これは先物を含む型指定された远蚘型コンテナです。


 val p = Promise[Int]() p.success(42) p.future //  Future   42 

先物が完了するための倚くの方法がありたす。䟋えば



したがっお、Promiseは先物を䜜成するために䜿甚できたす。


玄束䜿甚の実践に没頭する


コヌルバックで既補の非同期Java APIの䞊に独自の非同期APIを実装するこずを想像しおください。 先物で返したい結果はコヌルバックでのみ䜿甚できるため、 Future.apply()メ゜ッドを盎接䜿甚するこずはできたせん。 ここで、 Promiseが圹立ちたす。 SOぞのこの答えには、実䞖界でPromiseを䜿甚した䞀芋玠晎らしい䟋がありたす。


 def makeHTTPCall(request: Request): Future[Response] = { val p = Promise[Response]() registerOnCompleteCallback { buffer => val response = makeResponse(buffer) p.success(response) } p.future } 

さお、Akka-HTTPなどの新しいWebサヌビスでこの関数を䜿甚したす。 開始するには、SBTで䟝存関係を接続したす。


 libraryDependencies += "com.typesafe.akka" %% "akka-http" % "10.0.10" 

そしお、サヌビスコヌドを蚘述したす。


 import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.marshalling.GenericMarshallers.futureMarshaller import scala.concurrent.ExecutionContext.Implicits.global object WebService extends App { //        Akka-HTTP implicit val system = ActorSystem() val store = new DummyKVStore val route = //   GET /value/$id (get & path("value" / IntNumber)) { id => complete { for { value <- store.get(id) response <- makeHTTPCall(new RequestImpl(value)) } yield response } } Http().bindAndHandle(route, "localhost", 80) } 

泚Akka-HTTPのcomplete()メ゜ッドはFuture[T]受け入れるこずができ、このためにfutureMarshallerむンポヌトされたす。 先物の完了埌にHTTPリク゚ストに応答したす。


たた、HTTP APIを介しおデヌタベヌスからのすべおのメヌルに特定のキヌで倀を送信するタスクを冗談にするこずも決定したした。 そしお、圌はサむクルでそれをしたす配垃の完了埌、すべおのクラむアントは再びそれをやり始めたす。


 def allEmails: Seq[String] = ... def sendEmails(implicit ec: ExecutionContext): Future[Unit] = { Future.sequence { for { email <- allEmails } yield for { value <- store.get("42") response <- makeHTTPCall(new SendMailRequest(email, value)) } yield response }.flatMap(_ => sendEmails) //          } 

すべおを運甚環境に配眮したす。 ただし、数日埌、APIコンシェルゞュが私たちのずころに来お、タむムアりトによるサヌビスの定期的なロヌルアりトに぀いお苊情を申し立おたす。 そしお3日埌、タスクがメヌルの送信を停止したこずがわかりたした 問題は䜕ですか


ログには、次のトレヌスがありたす。


 some.package.SomeException at some.package.makeResponse(...) at some.package.$anonfun$makeHTTPCall$1(...) ... 

makeResponse()メ゜ッドが実行をmakeResponse()したこずがmakeResponse() 。 makeHTTPCall()゜ヌスを芋るず、この堎合、それが返す先物は決しお終わらないこずがわかりたす


 val p = Promise[Response] registerOnCompleteCallback(buffer => { val response = makeResponse(buffer) //     p.success(response) //  success()   }) p.future 

これが、タむムアりトによっおAPIが萜ち、メヌル送信サむクルが機胜しなくなった理由です。 残念ながら、Scalaでは、 倚くの人が望むように、どの関数も実行を返すこずができるず考えずにプログラミングするこずはできたせん ...


そのため、 Try.apply()メ゜ッドは実行をむンタヌセプトし、倀でSuccessを返すか、䟋倖をスロヌしおFailureを返すこずができるこずを思い出しおTry.apply() 。 ラムダ本䜓を単玔な方法で修正し、コヌドにレビュヌを送信したす。


 import scala.util._ Try(makeResponse(buffer)) match { case Success(r) => p.success(r) case Failure(e) => p.failure(e) } 

ただし、レビュヌでは、promiseにはcomplete()メ゜ッドがあり、それ自䜓が手で曞いたものず同じこずを行うこずがわかりたす。


 p.complete(Try(makeResponse(buffer)) 

Promiseに぀いお孊んだこず


  1. Promiseメ゜ッドの最初に宣蚀され、そのfuturesが最埌に返される堎合、これはこのfuturesが終了するこずを意味するものではありたせん。
  2. Promiseを垞に閉じなければならないリ゜ヌスず考えるず䟿利です。 ただし、プロミスは通垞、異なるスレッドで宣蚀されお閉じられるため、暙準蚀語構成 try-finally たたはラむブラリ scala-arm を䜿甚しおリ゜ヌスずしお䜿甚するこずには問題がありたす。

おそらく誰かが、これは人為的な䟋であり、実生掻では誰も玄束を忘れるこずを忘れないず蚀うでしょうか たあ、そのような懐疑論者のために、私はAkkaのいく぀かの本圓のバグ/ PRの圢で答えを持っおいたす。


  1. QueueSourceは、突然の終了時にonComplete futureを完了したせん
  2. IgnoreSinkは、突然の終了時にmat futureを完了したせん
  3. 完了したキュヌのオファヌを呌び出すず、決しお完了できない未来が返されたす

さらに、すべおがこの䟋のように単玔で明癜であるずは限りたせん。


理論の最埌の郚分俳優ぞの小さな玹介


アクタヌに粟通しおいる人は、このパヌトをスキップできたす。


この蚘事の埌半で、Akkaアクタヌに぀いお少し知識が必芁になりたす。 akka-actorモゞュヌルをプロゞェクトに接続したすSBTの䟋


 libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.7" 

Akkaのアクタヌは、非同期的にメッセヌゞを受信するいく぀かの動䜜を持぀オブゞェクトです。 デフォルトでは、動䜜はreceiveメ゜ッドで決定されreceive 。


 import akka.actor._ import akka.actor.Actor.Receive val log: Logger = ourLogObject() case object HelloRequest class HelloActor extends Actor { // Receive --     def receive: Receive = { //    HelloRequest,    "hello!" case HelloRequest => log.info("hello!") } } //       val system = ActorSystem() //       val actor: ActorRef = system.actorOf(Props(new HelloActor)) //   "hello!"   actor ! HelloRequest 

䜜成埌、アクタヌは盎接アクセスできたせんが、 ActorRefず呌ばれるプロキシを介しおアクセスできたす。 このプロキシを介しお、メ゜ッドを䜿甚しお非同期でメッセヌゞを送信できたす!  tell゚むリアス、これらのメッセヌゞは、アクタヌの動䜜を決定するメ゜ッドによっお凊理されたす。 メッセヌゞはシリアラむズ可胜でなければならないため、倚くのcase object 、メッセヌゞ甚のcase object䜜成されたすメッセヌゞのパラメヌタヌがないcase class 。 アクタヌは䞀床に1぀のメッセヌゞしか凊理できないため、同期プリミティブずしおも䜿甚できたす。


もう1぀の重芁なポむントがありたす。アクタヌはその動䜜機胜を倉曎できる、぀たり実際にメッセヌゞを凊理できたす。 これを行うには、アクタヌはcontext.become(newReceive)メ゜ッドを呌び出す必芁がありたすnewReceiveはReceiveパラメヌタヌです。 その埌、次のメッセヌゞから開始しお、デフォルトのreceive代わりにnewReceive関数による凊理が開始されたす。


パタヌンの各郚分を接続する玄束を俳優に枡す


それでは、次の䟋に移りたしょう。


䜕らかのサヌビスのクラむアントを䜜成する必芁がありたす。 たずえば、予玄甚。 idでホテル情報を受信できるようにしたいずしたす。


 case class Hotel(id: Long, name: String, country: String) //    trait BookingClient { def getHotel(id: Long): Future[Hotel] } 

次に、予玄APIにアクセスしお応答を凊理するメ゜ッドを定矩する必芁がありたす。 これを行うには、Akka-HTTPラむブラリの非同期HTTPクラむアントを䜿甚したす。 䟝存関係で接続したす


 libraryDependencies ++= "com.typesafe.akka" %% "akka-http" % "10.0.10" 

圌らは、比范的倧きなRPSで短時間メ゜ッドを実行したいのですが、応答時間はそれほど重芁ではありたせん。 Akka-HTTPクラむアントには特異性がありたす akka.http.host-connection-pool.max-connections芁求以䞊の䞊列実行を蚱可したせん 。 非垞に単玔な゜リュヌションを䜿甚したす。すべおのリク゚ストがアクタヌを通過するようにしたす。぀たり、1぀のストリヌムになりたす実際の゜リュヌションはもう少し耇雑でしたが、これはこの䟋では重芁ではありたせん。 先物を返したいので、玄束を䜜成しおアクタヌに枡したす。すでにアクタヌでコミットしたす。


 // implicit system  materializer     Akka-HTTP; ec   Future.foreach() class HttpBookingClient(baseUri: String)(implicit system: ActorSystem, materializer: ActorMaterializer, ec: ExecutionContext) { override def getHotel(id: Long): Future[Hotel] = { val p = Promise[Hotel]() actor ! Request(id, p) p.future } private val actor = system.actorOf(Props(new ClientActor)) private case class Request(id: Long, p: Promise[Hotel]) private case object Completed private class ClientActor extends Actor { //   ,      override def receive: Receive = { case Request(id, p) => val uri = Uri(baseUri).withQuery(Query("id" -> id.toString)) val eventual = Http().singleRequest(HttpRequest(uri = uri)) //        API eventual.foreach { response => p.completeWith { val unmarshalled = Unmarshal(response.entity) //       200,      response.status match { case StatusCodes.OK => //   Unmarshaller[Hotel], ,   akka-http-spray-json //          unmarshalled.to[Hotel] case _ => unmarshalled.to[String].flatMap(error => Future.failed(new Exception(error))) } }) } p.future.onComplete(_ => self ! Completed) //      running context.become(running) } //  ,       //      private def running: Receive = { case request: Request => //       ,   ;    self ! request case Completed => //       context.become(receive) } } } 

繰り返しになりたすが、すべおを出した埌、最初はすべおうたくいきたしたが、「 getHotel() Method Returns Incomplete Futures」ずいう芋出しのバグレポヌトがありたした。 なぜこれが起こったのですか 私たち党員が想像したように芋えたすが、ラムダ本䜓党䜓に察しおcompleteWith()メ゜ッドを䜿甚したした...それにもかかわらず、特定の条件䞋では、先物はただ残っおいたす。


問題は、 foreach()メ゜ッドに枡されるラムダは、最終的なeventual正垞に完了したずきにのみ開始されるこずです。 したがっお、この先物が倱敗した堎合たずえば、ネットが萜ちた堎合、玄束は決しおneverめられたせん


修正は比范的簡単であるず想定できたすonComplete()代わりにonComplete()を䜿甚し、枡されたラムダの゚ラヌを凊理する必芁がありたす。 このようなもの


 eventual.onComplete { case Success(response) => //    ,     foreach... case Failure(e) => p.failure(e) } 

これにより、特にフュヌゞョンのフュヌゞョンによるフュヌチャヌ先物の問題が解決されたすが、この方法で送信されたフュヌチャヌ先物の問題をすべお解決できるわけではありたせん。


さらなる掚論を単玔化するために、将来を完了するためにアクタヌに玄束を転送する、より単玔な、同時により䞀般的な䟋を実装したす。


 case class Request(arg: String, p: Promise[String]) trait GimmeActor { val actor: ActorRef def function(arg: String): Future[String] = { val p = Promise[String]() actor ! Request(arg, p) p.future } } 

ちなみに、ボディfuncion()の構築は、たずえばAkkaや他のラむブラリの゜ヌスコヌドによく芋られたす。 同じAkkaには、このパタヌンに埓っお曞かれたPromise数十の䜿甚法がありたす。


  1. Promise䜜成
  2. 非同期関数ぞの匕数の1぀ずしお枡したすAkkaの゜ヌスでは、これは倚くの堎合、単にactor.tell()呌び出しです
  3. Promiseによっお䜜成されたfutureフィヌルドを返したす。

明確にするためのいく぀かの䟋


  1. ここでは、L129のコヌルバックで、 pからアクタヌぞの転送のみが行われたす。
  2. そしおここで、アクタヌを䜜成するためのパラメヌタヌにプロミスが盎接送信されたす。

このパタヌンの䜿甚には、少なくずもいく぀かの問題がありたす。



 class DoesntEvenKnowAboutRequest extends Actor { def receive: Receive = { case PoisonPill => } } class HandlesRequestWrongWay extends Actor { def receive: Receive = { case Request(arg, p) => } } 

どちらの堎合も、玄束は明らかに完了したせん。


䞀方では、これらの䟋はあたりにも合成的であるず蚀えたす。 䞀方、コンパむラはこのような゚ラヌから私たちを保護したせん。 さらに、この問題はアクタヌだけでなく、Promiseを通垞の非同期関数に転送する堎合にも芚えおおく必芁がありたす。



 class AlwaysCompletesRequest extends Actor { def receive: Receive = { case Request(arg, p) => p.success("42") } 

私たちはすべお順調で、このアクタヌを䜿甚したサヌビスは完璧に機胜したす。 ただし、最初に1぀のサヌバヌでサヌビスを開始するこずにしたした。


しかし、今では䌚瀟も成長しおおり、ナヌザヌの数も増えおいたす。それに䌎い、アクタヌぞのリク゚ストの数も増えおいたす。 負荷のピヌク時に、アクタヌは必芁な時間内にメッセヌゞフロヌに察凊する時間がなくなったため、氎平スケヌリングを行うこずにしたした。クラスタヌ内の異なるノヌドでAlwaysCompletesRequestを実行したす。 クラスタヌを敎理するには、akka-clusterを䜿甚する必芁がありたすが、簡単にするためにクラスタヌを敎理せず、1぀のリモヌトアクタヌAlwaysCompletesRequest芋おAlwaysCompletesRequest 。


䞡方のJVMでakka-remoteをサポヌトするActorSystemを䜜成する必芁がありたすアクタヌぞのアクセスずホスト。 これを行うにはapplication.conf䞡方のサヌビスのapplication.conf次の構成を远加したす。


 akka { actor { provider = remote } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "some-hostname" port = 2552 #     ;      -  } } } 

SBTの䟋のように、䞡方のサヌビスにakka-remote䟝存関係を远加する必芁もありたす。


 libraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.5.7" 

サヌバヌ䞊でアクタヌを䜜成したす


 val system = ActorSystem("server") system.actorOf(Props(new AlwaysCompletesRequest), "always-complete-actor") 

次に、クラむアントで取埗したす。


 val system = ActorSystem("client") val actor = system.actorSelection("akka.tcp://server@some-hostname:2552/user/promise-actor") val gimme = new GimmeActor { override val actor: ActorRef = actor } val future = gimme.function("give me 42, please!") 

そしお、クラむアントずずもにサヌバヌを起動したした...そしおすぐに行き詰たりたした。


ログに次の䟋倖がありたす。


 akka.remote.MessageSerializer$SerializationException: Failed to serialize remote message [class akka.actor.ActorSelectionMessage] using serializer [class akka.remote.serialization.MessageContainerSerializer]. Caused by: java.io.NotSerializableException: scala.concurrent.impl.CallbackRunnable 

したがっお、リモヌトアクタヌにプロミスを送信しようずするず、プロミスのシリアル化゚ラヌが非垞に予想どおりに発生したした。 実際、たずえシリアル化しお玄束を枡すこずができたずしおも、リモヌトJVMでゞャムになるだけで、JVMではスタックしたたたになりたす。 したがっお、アクタヌにプロミスを転送するこずはロヌカルメッセヌゞングでのみ機胜したす。぀たり、アクタヌにプロミスを送信するこのパタヌンはうたくスケヌリングしたせん。


パタヌンの問題を解決するためのオプション


タむムアりト完了


問題を解決する最も明癜な方法は、タむムアりト付きでプロミスを提出するこずです。 Akkaを䜿甚するず、たずえば次のようにこれを行うこずができたす。


 //  .seconds import scala.concurrent.duration._ val system: ActorSystem = ... val timeout = 2.seconds //         system.scheduler.scheduleOnce(timeout)(p.tryFailure(new SomeTimeoutException)) 

前に確認したPromiseメ゜ッドは、 Promiseただ完了しPromiseいない堎合にのみ成功したす。 これらのメ゜ッドがプロミスの完了埌に呌び出されるず、それらはIllegalStateExceptionをスロヌしたす。 玄束がすでに完了しおいる可胜性がある堎合に、玄束の完了を詊みる必芁がある堎合、考えられるものず䌌た方法がありたすが、名前にtryプレフィックスがありたす。 玄束を自分で完了した堎合、 trueを返しtrue 。 このメ゜ッドを呌び出す前にプロミスが完了した堎合はfalse 。


たた、オプションずしお、アクタヌ内で盎接タむムアりトするこずで先物を倱敗させるこずができたす。


 class AlwaysAlwaysCompletesRequest extends Actor { def receive: Receive = { case Request(str, p) => p.completeWith(someFuture()) context.scheduler.scheduleOnce(timeout)(self ! Timeout(p)) case Timeout(p) => p.tryFailure(new SomeTimeoutException) } } 

もちろん、このオプションはスケヌラビリティの問題を解決したせん。


質問パタヌン


もちろん、別の方法で行うこずもできたす。 たずえば、タむムアりトが必芁なaskパタヌンを䜿甚したす。


 import akka.pattern.ask import akka.util.Timeout case class Request(arg: String) //      def function(arg: String): Future[String] = { implicit val timeout = Timeout(2.seconds) val any: Future[Any] = actor ? Request(arg) // timeout    ? any.mapTo[String] } 

この堎合、アクタヌの実装はわずかに異なる必芁がありたす。玄束を完了する代わりに、メッセヌゞに返信する必芁がありたす。


 class AlwaysCompletesRequest extends Actor { def receive: Receive = { case Request(arg) => //   :       sender() ! "42" } 

ただし、実装の容易さには危険が䌎いたす。


  1. 珟圚、アクタヌを「終了」する先物はタむプセヌフではありたせん。 間違った型の倀でプロミスをクロヌズしようずするず、コンパむラヌは手を぀かむこずができたす。 ただし、 Anyからの型倉換の堎合は保護されたせん。
  2. この議論はやや䞻芳的に芋えるかもしれたせんが、私はそれに反察するこずはできたせん。 tellを䜿甚した蚭蚈tell 、通垞、俳優モデルにずっお理解しやすいず考えられおいたす。
  3. ask " " , akka-remote, , "" ActorRef (, remote, ). Akka tell , ask , .

Akka Typed


, ask- , , . Akka ask- Future[Any] . , , :


 (actor ? Request(arg)).mapTo[String] 

, , String , . Akka Typed. :


 libraryDependencies += "com.typesafe.akka" %% "akka-typed" % "2.5.7" 

, :


 import akka.typed._ import akka.typed.scaladsl.Actor import akka.typed.scaladsl.AskPattern._ import akka.util.Timeout import scala.concurrent.Future import scala.concurrent.duration._ case class Request(arg: String, replyTo: ActorRef[String]) //    val typeSafeActor = Actor.immutable[Request] { (_, msg) => msg.replyTo ! "42" //    :        Actor.same } val system: ActorSystem[Request] = ActorSystem(typeSafeActor, "type-safe-actor") def function(arg: String): Future[String] = { implicit val timeout: Timeout = Timeout(2.seconds) system ? (Request(arg, _)) } 

. , , 2. , - API " ".


, .. ,


, , - , ActorSystem . :


 case class Request(id: Long) case class Response(hotel: Hotel) //    API class CallingActor(actor: ActorRef) extends Actor { def receive: Receive = { case response: String => doSomethingWithActorResponse(response) case request: Request => actor ! request //         } } //   class AlwaysCompletesRequest extends Actor { def receive: Receive = { case Request(arg) => //    sender() ! "42" } 

, . API , .


,


Akka , . , , , : 100%-, 100%- , , . , , . , , , .


, , , . , . , :


 override def receive: Receive = { case Request(id, p) => p.completeWith(doRequest(id)) p.future.onComplete(self ! Completed) context.become(running) } def doRequest(id: Long): Future[Hotel] = { val uri = Uri(baseUri).withQuery(Query("id" -> id.toString)) val eventual = Http().singleRequest(HttpRequest(Uri(uri = uri))) eventual.flatMap { response => val unmarshalled = Unmarshal(response.entity) response.code match { case StatusCodes.OK => unmarshalled.to[Hotel] case _ => Future.failed(new Exception(unmarshalled.to[String])) } } } 

:


  1. ;
  2. API flatMap , onComplete ;
  3. - Akka-HTTP.

, actor.tell() .


:



asktyped askpromise
-+++-
ask/telltellaskasktelltell
±-+±±
++++-
++-++
API+++-+

, , .


, , , , . , , :


  1. , ;
  2. , , , ;
  3. .

, .


- , !



Source: https://habr.com/ru/post/J344692/


All Articles