Elixir GenStageでDiscordが1分あたり1,000,000を超えるプッシュリクエストを処理する方法


不和

不和は前例のない成長を経験しました。 それに対処するために、開発部門には、バックエンドサービスを拡張する方法を探すという素晴らしい問題がありました。

このビジネスでは、Elixir GenStageと呼ばれる1つのテクノロジーを使用して大きな成功を収めました。

パーフェクトストーム:オーバーウォッチとポケモンGO


この夏、モバイルプッシュ通知システムは負荷からきしみ始めました。 チャット/ r /オーバーウォッチは25,000人の同時ユーザーを超え、ポケモンGOチャットグループはいたるところに存在していたため、通知ストリームの突然の急増が深刻な問題になりました。

通知ストリームのバーストは、プッシュ通知システム全体を禁止し、場合によっては配置します。 プッシュ通知は遅れるか、まったく届きません。

GenStageは救助に行きます


少し調査したところ、主なボトルネックはプッシュ通知をGoogle Firebase Cloud Messagingサービスに送信することであることがわかりました。

HTTPではなくXMPP経由でFirebaseにプッシュリクエストを送信することで、すぐにスループットを改善できることに気付きました。

Firebase XMPPは、HTTPよりもやや複雑です。 Firebaseでは、各XMPP接続が常に100を超えるリクエストをキューに入れていないことが必要です。 100件のリクエストがあなたから流れ去った場合、Firebaseがリクエストの受信を確認するまで待ってから次のリクエストを送信する必要があります。

キューでは一度に100件のリクエストしか許可されないため、リクエストストリームのバースト中にXMPP接続がオーバーフローしないように新しいシステムを設計する必要がありました。

一見すると、GenStageが問題の完全な解決策になると思われました。

GenStage


GenStageとは何ですか?

GenStageは、Elixirプロセス間で背圧イベントを交換するための新しいElixir動作モードです。 [ 0 ]

これはどういう意味ですか? 基本的に、このモードは、システムのどの部分も過負荷にならないように、必要なツールを提供します。

実際には、GenStageモードのシステムには通常、いくつかのステップがあります。

ステージは、他のステージからデータを送信および/または受信する計算のステージです。

ステージがデータを送信すると、プロデューサーとして機能します。 データを受信すると、消費者として。 ステージは、製造業者と消費者の両方の役割を果たすことができます。

プロデューサーとコンシューマーにロールを割り当てることに加えて、要素のみを生成する場合は「ソース」(ソース)として、要素を消費するだけの場合はシンクとしてステージを指定できます。 [ 1 ]

アプローチ




システムをGenStageの2つの段階に分割しました。 1つのソースと1つのストック。


GenStageによる背圧と負荷制限


GenStageには、リクエストの急増時にバックプレッシャーと負荷制限という2つの重要な機能があります。

背圧


プッシャーはGenStage機能を使用して、プッシャーが処理できるリクエストの最大数をプッシュコレクターにリクエストします。 これにより、保留中のプッシュ要求の数の上限が保証されます。 Firebaseがリクエストを確認しても、Pusherはプッシュコレクターを必要とします。

プッシャーは、Firebase XMPP接続が処理できるリクエストの正確な数を知っています。 ただし、Push Collectorは、Pusherが要求しない限り、Pusherにリクエストを送信しません。

負荷制限


プッシャーがプッシュコレクターに背圧をかけると、プッシュコレクターに潜在的なボトルネックが発生します。 超強力な強力なバーストはそれを過負荷にすることができます。

GenStageには、このような状況のための別の組み込み関数があります。それは、バッファーされたイベントです。

プッシュコレクターでは、バッファリングするプッシュリクエストの数を決定します。 通常の状態では、バッファーは空ですが、壊滅的なイベントが発生する月に1回は便利です。

多くのイベントがシステムを通過し、バッファーがいっぱいになると、Push Collectorは着信プッシュ要求をリセットします。 これは、 initプッシュコレクター関数でbuffer_sizeオプションを指定するだけでbuffer_size発生します。

これらの2つの機能を使用して、プッシュ通知のバーストに対処できます。

コード(最後に、最も重要な部分)


以下は、プッシャーおよびプッシュコレクターのステップを設定する方法のサンプルコードです。 簡単にするために、接続が失われたとき、Firebaseがエラーを返すなど、フェイルオーバーの原因となる多くのフラグメントを削除しました。

結果を確認する場合は、コードをスキップできます。

プッシュコレクター(メーカー)


push_collector.ex

 defmodule GCM.PushCollector do use GenStage # Client def push(pid, push_requests) do GenServer.cast(pid, {:push, push_requests}) end # Server def init(_args) do # Run as producer and specify the max amount # of push requests to buffer. {:producer, :ok, buffer_size: @max_buffer_size} end def handle_cast({:push, push_requests}, state) do # Dispatch the push_requests as events. # These will be buffered if there are no consumers ready. {:noreply, push_requests, state} end def handle_demand(_demand, state) do # Do nothing. Events will be dispatched as-is. {:noreply, [], state} end end 

プッシャー(消費者)


pusher.ex

 defmodule GCM.Pusher do use GenStage # The maximum number of requests Firebase allows at once per XMPP connection @max_demand 100 defstruct [ :producer, :producer_from, :fcm_conn_pid, :pending_requests, ] def start_link(producer, fcm_conn_pid, opts \\ []) do GenStage.start_link(__MODULE__, {producer, fcm_conn_pid}, opts) end def init({producer, fcm_conn_pid}) do state = %__MODULE__{ next_id: 1, pending_requests: Map.new, producer: producer, fcm_conn_pid: fcm_conn_pid, } send(self, :init) # Run as consumer {:consumer, state} end def handle_info(:init, %{producer: producer}=state) do # Subscribe to the Push Collector GenStage.async_subscribe(self, to: producer, cancel: :temporary) {:noreply, [], state} end def handle_subscribe(:producer, _opts, from, state) do # Start demanding requests now that we are subscribed GenStage.ask(from, @max_demand) {:manual, %{state | producer_from: from}} end def handle_events(push_requests, _from, state) do # We got some push requests from the Push Collector. # Let's send them. state = Enum.reduce(push_requests, state, &do_send/2) {:noreply, [], state} end # Send the message to FCM, track as a pending request defp do_send(%{fcm_conn_pid: fcm_conn_pid, pending_requests: pending_requests}=state, push_request) do {message_id, state} = generate_id(state) xml = PushRequest.to_xml(push_request, message_id) :ok = FCM.Connection.send(fcm_conn_pid, xml) pending_requests = Map.put(pending_requests, message_id, push_request) %{state | pending_requests: pending_requests} end # FCM response handling defp handle_response(%{message_id: message_id}=response, %{pending_requests: pending_requests, producer_from: producer_from}=state) do {push_request, pending_requests} = Map.pop(pending_requests, message_id) # Since we finished a request, ask the Push Collector for more. GenStage.ask(producer_from, 1) %{state | pending_requests: pending_requests} end defp generate_id(%{next_id: next_id}=state) do {to_string(next_id), %{state | next_id: next_id + 1}} end end 

事件の例

以下は、システムで発生した実際のインシデントです。 上のグラフは、システムを通過する1秒あたりのプッシュ要求の数を示しています。 下のグラフは、プッシュコレクターバッファーに配置されたプッシュ要求の数を示しています。





イベントのクロニクル:


成功エリクサー


Discordでは、ElixirとErlangをバックエンドサービスの重要なテクノロジーとして使用することに非常に満足しています。 破壊不可能なErlang / OTPテクノロジーに依存するGenStageのような拡張機能を見るのは素晴らしいことです。

Discordが成長し続ける中、私たちはそのような問題を解決するために勇気ある精神を求めます。 あなたがゲームが好きで、この種のタスクがあなたの心臓の鼓動を速くするなら、 私たちの空室を見てください

Source: https://habr.com/ru/post/J317724/


All Articles