Vertxの非同期プログラミングの3つのパラダイム

非同期プログラミングの3つのパラダイム-コールバック、フューチャー、Vertxフレームワーク上の単純なWebアプリケーションの例のコルーチンを示したいと思います。 コードはKotlinで記述します。

HTTPリクエストで特定の文字列を受信するアプリケーションがあり、データベースでURLを検索し、そのURLに移動して、そのコンテンツをクライアントに送り返すとします。
Vertxは、負荷の高いアプリケーション用の非同期フレームワークとして構想され、netty、新しいIO、イベントバスを使用します

Vertxの慣例に従い、1つのVerticle(Akkaを知っている場合はアクターの類似物)がリクエストを受信し、イベントバス文字列を送信して、実際に作業に従事している他のBusinessVerticleのURLを検索します。

object Main { @JvmStatic fun main(args: Array<String>) { val vertx = Vertx.vertx() vertx.deployVerticle(HttpVerticle()) vertx.deployVerticle(BusinessVerticle()) } } 

 class HttpVerticle : AbstractVerticle() { @Throws(Exception::class) override fun start(startFuture: Future<Void>) { val router = createRouter() vertx.createHttpServer() .requestHandler(router) .listen(8080) { result -> if (result.succeeded()) { startFuture.complete() } else { startFuture.fail(result.cause()) } } } private fun createRouter(): Router = Router.router(vertx).apply { get("/").handler(handlerRoot) } private val handlerRoot = Handler<RoutingContext> { rc -> vertx.eventBus().send("my.addr", rc.request().getParam("id") ?: "") { resp: AsyncResult<Message<String>> -> if (resp.succeeded()) { rc.response().end(resp.result().body()) } else { rc.fail(500) } } } } 

標準APIでは、すべての非同期はコールバックを通じて行われるため、BusinessVerticleの初期実装は次のようになります。

 class BusinessVerticle : AbstractVerticle() { private lateinit var dbclient: JDBCClient private lateinit var webclient: WebClient override fun start() { vertx.eventBus().consumer<String>("my.addr") { message -> handleMessage(message) } dbclient = JDBCClient.createShared( vertx, JsonObject() .put("url", "jdbc:postgresql://localhost:5432/payroll") .put("driver_class", "org.postgresql.Driver") .put("user", "vala") .put("password", "vala") .put("max_pool_size", 30) ) val options = WebClientOptions() .setUserAgent("My-App/1.2.3") options.isKeepAlive = false webclient = WebClient.create(vertx, options) } private fun handleMessage(message: Message<String>) { dbclient.getConnection { res -> if (res.succeeded()) { val connection = res.result() connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 -> if (res2.succeeded()) { try { val url = res2.result().rows[0].getString("url").removePrefix("http://") webclient .get(url,"/") .send { ar -> if (ar.succeeded()) { val response = ar.result() message.reply(response.bodyAsString()) } else { message.fail(500, ar.cause().message) } } } catch (e: Exception) { message.fail(500, e.message) } } else { message.fail(500, res2.cause().message) } } } else { message.fail(500, res.cause().message) } } } } 

率直に言って、まあまあ-コールバック地獄、特にエラー処理。

コールバックの第一人者が教えてくれるように、状況を改善してみましょう-個別のメソッドで各コールバックを選択することにより:

  class BusinessVerticle: AbstractVerticle() { private lateinit var dbclient: JDBCClient private lateinit var webclient: WebClient override fun start() { vertx.eventBus().consumer<String>("my.addr") { message -> handleMessage(message) } dbclient = JDBCClient.createShared( vertx, JsonObject() .put("url", "jdbc:postgresql://localhost:5432/payroll") .put("driver_class", "org.postgresql.Driver") .put("user", "vala") .put("password", "vala") .put("max_pool_size", 30) ) val options = WebClientOptions() .setUserAgent("My-App/1.2.3") options.isKeepAlive = false webclient = WebClient.create(vertx, options) } private fun handleMessage(message: Message<String>) { dbclient.getConnection { res -> handleConnectionCallback(res, message) } } private fun handleConnectionCallback( res: AsyncResult<SQLConnection>, message: Message<String> ) { if (res.succeeded()) { val connection = res.result() connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 -> handleQueryCallBack(res2, message) } } else { message.fail(500, res.cause().message) } } private fun handleQueryCallBack( res2: AsyncResult<ResultSet>, message: Message<String> ) { if (res2.succeeded()) { try { val url = res2.result().rows[0].getString("url").removePrefix("http://") webclient .get(url, "/") .send { ar -> handleHttpCallback(ar, message) } } catch (e: Exception) { message.fail(500, e.message) } } else { message.fail(500, res2.cause().message) } } private fun handleHttpCallback( ar: AsyncResult<HttpResponse<Buffer>>, message: Message<String> ) { if (ar.succeeded()) { // Obtain response val response = ar.result() message.reply(response.bodyAsString()) } else { message.fail(500, ar.cause().message) } } } 

まあ、それは良くなりました。 しかし、まあまあ。

特に読みやすいコードではない多くの行では、応答のためにメッセージオブジェクトをドラッグする必要があります。エラー処理はコード全体に広がります。

Futuresを使用してこのがらくたを書き換えてみましょう
Futureは、 Future.compose()を使用して簡単に結合できるため、特に優れています。

最初に、コールバックを受け入れ、Futureを返すメソッドに何も返さない標準のVertxメソッドを変換します。

既存のクラスにメソッドを追加するKotlinの機能を利用します。

 fun JDBCClient.getConnectionF(): Future<SQLConnection> { val f = Future.future<SQLConnection>() getConnection { res -> if (res.succeeded()) { val connection = res.result() f.complete(connection) } else { f.fail(res.cause()) } } return f } fun SQLConnection.queryF(query:String): Future<ResultSet> { val f = Future.future<ResultSet>() query(query) { res -> if (res.succeeded()) { val resultSet = res.result() f.complete(resultSet) } else { f.fail(res.cause()) } } return f } fun <T,M> HttpRequest<T>.sendF(): Future<HttpResponse<M>> { val f = Future.future<HttpResponse<M>>() send() { res -> if (res.succeeded()) { val response = res.result() f.complete(response) } else { f.fail(res.cause()) } } return f } 

そして、BusinessVerticle.handleMessageを次のようにします。

  private fun handleMessage(message: Message<String>) { val content = getContent(message) content.setHandler{res-> if (res.succeeded()) { // Obtain response val response = res.result() message.reply(response) } else { message.fail(500, res.cause().message) } } } private fun getContent(message: Message<String>): Future<String> { val connection = dbclient.getConnectionF() val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") } val url = resultSet.map { it.rows[0].getString("url").removePrefix("http://") } val httpResponse = url.compose { webclient.get(it, "/").sendF() } val content = httpResponse.map { it.bodyAsString() } return content } 

かっこいい。

シンプルで読みやすいコード。 一箇所でのエラー処理。 必要に応じて、異なる例外に対して異なる反応をさせることができます。たとえば、それを別の関数に入れることができます。

詩人の夢!

しかし、何らかの条件でFutureチェーンを終了する必要がある場合はどうなりますか?
たとえば、データベースに対応するエントリがない場合、例外(およびクライアントにコード500)をスローするのではなく、コード200で文字列「レコードなし」を返します。

Future.compose()からチェーンを終了する唯一の方法(私が知っているは、例外をスローすることです。

つまり このようなことをする必要があります:例外のタイプを決定し、データベースにエントリがなければこの例外をスローし、この例外を特別な方法で処理します。

 class NoContentException(message:String):Exception(message) private fun getContent(message: Message<String>): Future<String> { val connection = dbclient.getConnectionF() val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") } val url = resultSet.map { if (it.numRows<1) throw NoContentException("No records") it.rows[0].getString("url").removePrefix("http://") } val httpResponse = url.compose { webclient.get(it, "/").sendF() } val content = httpResponse.map { it.bodyAsString() } return content } private fun handleMessage(message: Message<String>) { val content = getContent(message) content.setHandler{res-> if (res.succeeded()) { // Obtain response val response = res.result() message.reply(response) } else { if (res.cause() is NoContentException) message.reply(res.cause().message) else message.fail(500, res.cause().message) } } } 

うまくいく!

しかし、すでに悪化しているように見えます。例外を使用して実行フローを制御することは美しくありません。 また、個別の処理を必要とするこのようなケースが多数ある場合、コードははるかに読みにくくなります。

Kotlinコルーチンで同じことを試してみましょう。
Habré( 1、2、3 、...)を含め、コルーチンについて多くのことが書かれているので、それらについては別に書きません。

Vertxの最新バージョンは、コールバックが受け入れるすべてのメソッドのコルーチンバージョンを自動的に生成します。

図書館をつなぐ
「vertx-lang-kotlin-coroutines」
「vertx-lang-kotlin」

そして、例えば、取得

 JDBCClient.getConnectionAwait() SQLConnection.queryAwait() 

など

その後、メッセージ処理メソッドは、キュートでシンプルなものに変わります。

 private suspend fun handleMessage(message: Message<String>) { try { val content = getContent(message) message.reply(content) } catch(e:Exception){ message.fail(500, e.message) } } private suspend fun getContent(message: Message<String>): String { val connection = dbclient.getConnectionAwait() val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'") val url = resultSet.rows[0].getString("url").removePrefix("http://") val httpResponse = webclient.get(url, "/").sendAwait() val content = httpResponse.bodyAsString() return content } 

さて、コルーチンコンテキストを提供して呼び出しを変更する必要があります。

 vertx.eventBus().consumer<String>("my.addr") { message -> GlobalScope.launch(vertx.dispatcher()) { handleMessage(message)} } 

ここで何が起こっていますか?

Awaitを使用したこれらのメソッドはすべて、コードを非同期的に呼び出し、その結果を待機し、待機中にスレッドが別のコルーチンの実行に切り替えます。

ボンネットの下を見ると、次のようになります。

 suspend fun SQLClient.getConnectionAwait(): SQLConnection { return awaitResult { this.getConnection(it) } } suspend fun <T> awaitResult(block: (h: Handler<AsyncResult<T>>) -> Unit): T { val asyncResult = awaitEvent(block) if (asyncResult.succeeded()) return asyncResult.result() else throw asyncResult.cause() } suspend fun <T> awaitEvent(block: (h: Handler<T>) -> Unit): T { return suspendCancellableCoroutine { cont: CancellableContinuation<T> -> try { block.invoke(Handler { t -> cont.resume(t) }) } catch (e: Exception) { cont.resumeWithException(e) } } } 

Futuresによる自己記述の実装に似たもの。

しかし、ここでは通常のコードを取得します-戻り値の型として(Futureの代わりに)文字列、AsyncResultでいコールバックの代わりにtry / catch

実行チェーンを途中で停止する必要がある場合、例外なく自然に見えます。

  private suspend fun getContent(message: Message<String>): String { val connection = dbclient.getConnectionAwait() val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'") if (resultSet.numRows<1) return "No records" val url = resultSet.rows[0].getString("url").removePrefix("http://") val httpResponse = webclient.get(url, "/").sendAwait() val content = httpResponse.bodyAsString() return content } 

私の意見では、素晴らしい!

Source: https://habr.com/ru/post/J449092/


All Articles