記事の
第2部では、処理中にエラーを検出するメカニズムについて説明しました。
処理がエラーで失敗しました。次に何をしますか? クラスタノードの1つとの通信が失われたり、データベースが一時的に利用できなくなったりする可能性があります。 この場合、どの操作が正常に実行され、どの操作が失敗したかを確実に言うことはできません。 フラグを設定するなど、チェーン内のすべての操作が再適用可能(べき
等 )である場合は、単純に処理を再開できます。 そうでない場合は、Stormトランザクションメカニズムが役立ちます。
トランザクションの特性について説明すると、
ACIDという用語がすぐに表示されます。
- トミシティ(原子性)。 トランザクション中にシステムで行われたすべての変更は、完全に適用されるか、まったく適用されません。
- 一貫性。 トランザクションは、システムをある一貫した状態から別の状態に移行します。
- 私は分離(分離)。 同時に実行されるトランザクションは、互いの作業の結果に影響しません。
- 耐久性(信頼性)。 トランザクションの変更はシステムに残ることが保証されています。
一貫性と耐久性はデータベースにより関連しています。 原子性と分離に興味があります。
バージョン0.8.0で、Stormは
Apache Pigに類似した
Tridentサブシステムを導入しました。
トランザクショントポロジ機能が移行されました。
Stormのトランザクション
トミシティ
トポロジは、データベースとの作業をカプセル化する
Stateインターフェースを実装するオブジェクトを作成します。 Spoutへの入力はTupleに分割され、バッチパケットで収集されます。 バッチは一意のトランザクションIDに関連付けられています。 タプル形成バッチは並行して処理できます。
処理チェーンの最後に、単一のトランザクションに関連する
Tupleのセットが 、
StateUpdaterインターフェイスを実装するクラスの
updateStateメソッドに渡され、State
の変更が
生成されます。 正常に完了すると、Spoutはバッチ処理の成功に関する通知を受け取ります。 エラーが発生した場合、Spoutは処理のためにバッチ全体を再度送信する必要があります。
したがって、Stormは、バッチが完全に1回だけデータベースにコミットされるようにします。
私はソレーション
Stormは、バッチがトランザクションIDの昇順で厳密に連続してStateUpdaterに送信されることを保証します。 つまり、バッチ#2は、バッチ#1が正常に修正された後にのみ修正されます。
実装
トランザクション
スパウトは、
ICommitterTridentSpout <TransactionMetadata>インターフェイスを実装する必要があります。 TransactionMetadata-Batchを生成し、次のトランザクションを生成するためのデータを含むクラス:
TxMeta 。
非表示のテキストpublic class TxMeta { private int start; private int count; public TxMeta(int start, int count) { this.start = start; this.count = count; }
ITridentSpout.BatchCoordinator <TransactionMetadata>インターフェースを実装するクラスは、トランザクションを作成するときにTransactionMetadataを初期化し、次のトランザクションの準備ができている場合は要求に応答します:
TridentTxSpout 。 トポロジごとに1つのコピーで作成されます。
非表示のテキスト static class BCoordinator implements BatchCoordinator<TxMeta> { private static final int TRANSACTION_COUNT = 5; private static final int TRANSACTION_ELEMENT_COUNT = 5;
ICommitterTridentSpout.Emitterインターフェイスを実装するクラスは、バッチを形成します。 バッチ処理でエラーが発生した場合は、バッチを再度作成します。
重要-再構築されたバッチには、元のバッチとまったく同じタプルセットが含まれている必要があります。
非表示のテキスト static class BEmitter implements Emitter {
この場合、
Stateインターフェースを実装するクラス、データベースドライバー:
TxDatabase 。
非表示のテキスト public class TxDatabase implements State {
BaseStateUpdater <S extends State>を継承するクラスは、状態(DB)を変更します:
TxDatabaseUpdater非表示のテキスト public class TxDatabaseUpdater extends BaseStateUpdater<TxDatabase> { int count;
StateFactoryインターフェースを実装するクラスは、State:
TxDatabaseFactoryをインスタンス化します。
すべてまとめて
TridentTransactionApp :
public class TridentTransactionApp { public static void main( String[] args ) throws Throwable { Logger.getRootLogger().setLevel(Level.ERROR);
Stormのトランザクション機能は、自明でない処理が必要なときに、あるシステムから別のシステムにデータを転送するために使用すると非常に便利です。 たとえば、1つのシステムがファイルを生成し、Stormがそれらをレコードに分割し、それらを並行して処理し、データベースに追加します。 処理エラーの場合、ファイルが削除されず、2回処理されないという保証があります。
PS。 記事の枠組みの中でStormの可能性をすべて明らかにすることは不可能であり、本全体に十分な資料があります。 フレームワークの主要な機能と実際のプロジェクトでのアプリケーションの可能性を示すことができたと思います。
クラスターの展開に関して、最近、すばらしい
記事に出会いました。 繰り返す理由はない。 運用環境でStormを展開するのは本当に簡単です。
PPS Hadoopには、Stormのオンライン処理に類似した
Hadoop Streamingがありますが、Stormとは異なり、トランザクションをサポートしていません。