Stormフレームワークの学習。 パートI

2011年、TwitterはEclipse Public Licenseの下で、 Storm Distributed Computingプロジェクトを開始しました。 StormはBackTypeで作成され、購入後にTwitterに切り替えられました。

Stormは、 Apache Hadoopに似ていますが、リアルタイムの大規模なデータストリームの分散処理に焦点を当てたシステムです。

Stormの主な機能:

最初の部分では、Stormバージョン0.8.2を使用してアプリケーションを作成する基本概念と基本について説明します。

嵐の要素


タプル
データ表示要素。 デフォルトでは、Long、Integer、Short、Byte、String、Double、Float、Boolean、byte []フィールドを含めることができます。 Tupleで使用されるカスタム型はシリアル化可能でなければなりません。

ストリーム
タプルのシーケンス。 Tupleのフィールドの命名スキームが含まれています。

注ぎ口
ストリームのデータプロバイダー。 外部ソースからデータを受信し、それらからタプルを形成し、それをストリームに送信します。 Tupleを複数の異なるストリームに送信できます。 RabbitMQ / AMQPKestrelJMSKafkaなどの一般的なメッセージングシステムに対応しています。

ボルト
データハンドラ。 入力はTupleです。 0個以上のTupleを出力に送信します。

トポロジー
関係の説明を持つ要素のコレクション。 HadoopのMapReduceジョブに類似しています。 MapReduceジョブとは異なり、入力データストリームが使い果たされた後も停止しません。 注ぎ口要素とボルト要素の間でタプル輸送を実行します。 ローカルで起動するか、Stormクラスターにロードできます。

使用例


挑戦する


Cdr電話データストリームがあります。 ソース番号に基づいて、クライアントIDが決定されます。 宛先番号と顧客IDに基づいて、料金が決定され、通話コストが考慮されます。 各ステージは複数のスレッドで動作するはずです。
この例はローカルマシンで実行されます。

実装


開始するには、 BasicApp入力を印刷するだけです。

新しいトポロジを作成します。
TopologyBuilder builder = new TopologyBuilder(); 

入力を生成するスパウトCdrSpoutを追加します。
 builder.setSpout("CdrReader", new CdrSpout()); 

2つのストリームでBoltを追加し、出力ストリームがCdrReaderであることを示します。 shuffleGroupingは、CdrReaderからのデータがランダムに選択されたPrintOutBoltに供給されることを意味します。
 builder.setBolt("PrintOutBolt", new PrintOutBolt(), 2).shuffleGrouping("CdrReader"); 

ローカルStormクラスターを構成して開始します。
 Config config = new Config(); //     config.setDebug(false); LocalCluster cluster = new LocalCluster(); //   Storm  cluster.submitTopology("T1", config, builder.createTopology()); //  Topology Thread.sleep(1000*10); cluster.shutdown(); //   

出力はおよそ次のとおりです。
非表示のテキスト
 OUT >> [80] Cdr {callSource = '78119990005'、callDestination = '8313610698077174239'、 
 callTime = 7631、clientId = 0、price = 0}
 OUT >> [78] Cdr {callSource = '78119990006'、callDestination = '2238707710336895468'、 
 callTime = 20738、clientId = 0、price = 0}
 OUT >> [78] Cdr {callSource = '78119990007'、callDestination = '579372726495390920'、 
 callTime = 31544、clientId = 0、price = 0}
 OUT >> [80] Cdr {callSource = '78119990006'、callDestination = '2010724447342634423'、 
 callTime = 10268、clientId = 0、price = 0}

角括弧内の数字はスレッドIDです。処理が並行して実行されていることがわかります。

さらなる実験のために、いくつかのハンドラー間での入力データの分布に対処する必要があります。
上記の例では、ランダムなアプローチが使用されました。 しかし、実際の使用では、Boltはおそらく外部ヘルプシステムとデータベースを使用します。 この場合、各Boltが入力データの独自のサブセットを処理することが望ましいです。 その後、外部システムからのデータの効果的なキャッシュを整理することが可能になります。

これを行うために、StormはCustomStreamGroupingインターフェイスを提供します。
CdrGrouperをプロジェクトに追加します。 そのタスクは、同じソース番号のタプルを同じボルトに送信することです。 これを行うために、CustomStreamGroupingは2つの呼び出しを提供します。
prepare-最初に使用する前に呼び出されます:
 @Override public void prepare(WorkerTopologyContext workerTopologyContext, GlobalStreamId globalStreamId, List<Integer> integers) { tasks = new ArrayList<>(integers); //   Bolts } 

そしてchooseTasks - Tupleからのリストが入力され、 Tupleリスト内の各位置のボルト番号で構成されるリストが返されます:
 @Override public List<Integer> chooseTasks(int i, List<Object> objects) { List<Integer> rvalue = new ArrayList<>(objects.size()); for(Object o: objects) { Cdr cdr = (Cdr) o; rvalue.add(tasks.get(Math.abs(cdr.getCallSource().hashCode()) % tasks.size())); } return rvalue; } 

shuffleGroupingをCdrGrouper BasicGroupAppに置き換えます。
 builder.setBolt("PrintOutBolt", new PrintOutBolt(), 2). customGrouping("CdrReader", new CdrGrouper()); 

実行して、意図したとおりに機能することを確認します。
非表示のテキスト
 OUT >> [80] Cdr {callSource = '78119990007'、callDestination = '3314931472251135073'、 
 callTime = 17632、clientId = 0、price = 0}
 OUT >> [80] Cdr {callSource = '78119990007'、callDestination = '4182885669941386786'、 
 callTime = 31533、clientId = 0、price = 0}


次に、プロジェクトに追加します。
ClientIdBolt-ソース番号によってクライアントIDを識別します。
ClientIdGrouper-クライアントIDでグループ化します。
RaterBolt-請求に従事しています。
CalcAppはプログラムの最終バージョンです。

トピックが興味深い場合は、次のパートで、データ損失と実際のクラスターでの実行に対する保護メカニズムについてお話したいと思います。 コードはGitHubで入手できます

PS。 もちろん、曲から言葉を放り出すことはありませんが、データプロセッサの名前「ボルト」はやや紛らわしいです:)

UPD。 記事の2番目の部分が公開されています。

Source: https://habr.com/ru/post/J186208/


All Articles