
Oozieを介して通常のSparkタスクの起動を実装する必要性は長い間空いていましたが、すべての手が届きませんでした。 この記事では、全体のプロセスを説明したいと思います。おそらくそれはあなたの人生を単純化するでしょう。
内容
- 挑戦する
- ハードウェアとインストールされたソフトウェア
- Sparkタスクの作成
- workflow.xmlを書く
- coordinator.xmlの作成
- hdfsでプロジェクトをホストする
- 定期的に実行する
- おわりに
挑戦する
hdfsには次の構造があります。
hdfs://hadoop/project-MD2/data hdfs://hadoop/project-MD2/jobs hdfs://hadoop/project-MD2/status
data
ディレクトリには毎日data
が提供され、日付に従ってディレクトリに分解されます。 たとえば、2017年12月31日のデータは、次の方法でhdfs://hadoop/project/data/2017/12/31/20171231.csv.gz
れますhdfs://hadoop/project/data/2017/12/31/20171231.csv.gz
- 行区切り文字:「\ n」
- 列区切り文字:「;」
- 圧縮方法:gzip
- 列数:5(device_id; lag_A0; lag_A1; flow_1; flow_2)
- 最初の行のタイトルがありません
- 前日のデータは、翌日の00:00から03:00までの時間間隔で適切なディレクトリに記録されることが保証されています。
jobs
ディレクトリには、プロジェクトに直接関連するタスクが含まれています。 また、タスクをこのディレクトリに配置します。
統計は、json形式で各日の空のフィールドの数(nullの値)のstatus
ディレクトリに保存する必要があります。 たとえば、2017年12月31日のデータの場合、 hdfs://hadoop/project-MD2/status/2017/12/31/part-*.json
ファイルが表示されhdfs://hadoop/project-MD2/status/2017/12/31/part-*.json
JSONファイルを受け入れます:
{ "device_id_count_empty" : 0, "lag_A0_count_empty" : 10, "lag_A1_count_empty" : 0, "flow_1_count_empty" : 37, "flow_2_count_empty" : 100 }
ハードウェアとインストールされたソフトウェア
10マシンのクラスターを自由に使用できます。各クラスターには8コアプロセッサと64 GBのRAMがあります。 すべてのマシンのハードドライブの合計容量は100 TBです。 クラスターでタスクを開始するには、 PROJECTS
キューが割り当てられます。
インストールされたソフトウェア:
- Apache Hadoop 2.7.3(Hortonworks)
- Apache Spark 2.0.0
- Apache Oozie 4.2.0
- Scala 11.11.11
- Sbt 1.0.2
Sparkタスクの作成
プロジェクト構造を作成します。以下に示すように、scalaをサポートする開発環境またはコンソールから実行するのは非常に簡単です。
mkdir -p daily-statistic/project echo "sbt.version = 1.0.2" > daily-statistic/project/build.properties echo "" > daily-statistic/project/plugins.sbt echo "" > daily-statistic/build.sbt mkdir -p daily-statistic/src/main/scala
daily-statistic/project/plugins.sbt
、アセンブリのプラグインを追加します。これには、 daily-statistic/project/plugins.sbt
ファイルに次の行を追加します。
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")
プロジェクト、依存関係、およびアセンブリ機能の説明をdaily-statistic/build.sbt
。
name := "daily-statistic" version := "0.1" scalaVersion := "2.11.11" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.0.0" % "provided", "org.apache.spark" %% "spark-sql" % "2.0.0" % "provided" ) assemblyJarName in assembly := s"${name.value}-${version.value}.jar"
daily-statistic
ディレクトリに移動し、 sbt update
コマンドを実行してプロジェクトを更新し、リポジトリから依存関係をプルアップします。
src/main/scala/ru/daily
ディレクトリにStatistic.scala
を作成します
タスクコード:
package ru.daily import org.apache.spark.sql.SparkSession import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._ object Statistic extends App {
daily-statistic
ディレクトリからsbt assembly
コマンドを使用しsbt assembly
プロジェクトをsbt assembly
ます。 アセンブリが正常に完了すると、 daily-statistic-0.1.jar
タスクを含むパッケージがdaily-statistic/target/scala-2.11
daily-statistic-0.1.jar
ます。
workflow.xmlを書く
Oozieを介してタスクを実行するには、 workflow.xml
ファイルで起動構成を記述する必要があります。 以下がタスクの例です。
<workflow-app name="project-md2-daily-statistic" xmlns="uri:oozie:workflow:0.5"> <global> <configuration> <property> <name>oozie.launcher.mapred.job.queue.name</name> <value>${queue}</value> </property> </configuration> </global> <start to="project-md2-daily-statistic" /> <action name="project-md2-daily-statistic"> <spark xmlns="uri:oozie:spark-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <master>yarn-client</master> <name>project-md2-daily-statistic</name> <class>ru.daily.Statistic</class> <jar>${nameNode}${jobDir}/lib/daily-statistic-0.1.jar</jar> <spark-opts> --queue ${queue} --master yarn-client --num-executors 5 --conf spark.executor.cores=8 --conf spark.executor.memory=10g --conf spark.executor.extraJavaOptions=-XX:+UseG1GC --conf spark.yarn.jars=*.jar --conf spark.yarn.queue=${queue} </spark-opts> <arg>${nameNode}${dataDir}</arg> <arg>${datePartition}</arg> <arg>${nameNode}${saveDir}</arg> </spark> <ok to="end" /> <error to="fail" /> </action> <kill name="fail"> <message>Statistics job failed [${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end" /> </workflow-app>
global
ブロックでは、タスクを見つけて実行するMapReduceタスクのキューが設定されます。
action
ブロックは、アクション、この場合はスパークタスクの起動、および完了時に実行する必要があるものをステータス
またはERROR
記述します。
spark
ブロックでは、環境が定義され、タスクが構成され、引数が渡されます。 タスクの起動構成については、 spark-opts
説明していspark-opts
。 パラメータは公式ドキュメントに記載されています。
タスクがステータスERROR
で完了すると、実行はERROR
終了ブロックに渡され、複数のエラーメッセージが表示されます。
中括弧内のパラメーター、たとえば${queue}
、起動時に決定します。
coordinator.xmlの作成
定期的な起動を整理するには、別のcoordinator.xml
が必要です。 以下がタスクの例です。
<coordinator-app name="project-md2-daily-statistic-coord" frequency="${frequency}" start="${startTime}" end="${endTime}" timezone="UTC" xmlns="uri:oozie:coordinator:0.1"> <action> <workflow> <app-path>${workflowPath}</app-path> <configuration> <property> <name>datePartition</name> <value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, 'DAY'), "yyyy/MM/dd")}</value> </property> </configuration> </workflow> </action> </coordinator-app>
ここでは、実行頻度、タスクの開始日時、タスクの終了日時をそれぞれ決定するfrequency
、 start
、 end
パラメーターからおもしろい。
workflow
ブロックでは、 workflow.xml
ファイルがあるディレクトリへのパスが指定されています。これは、後で起動時に指定します。
configuration
ブロックで、 datePartition
プロパティーの値がdatePartition
されます。この場合、 yyyy/MM/dd
から1日を引いた形式の現在の日付に等しくなります。
hdfsでプロジェクトをホストする
既に述べたように、タスクをhdfs://hadoop/project-MD2/jobs
ディレクトリ:
hdfs://hadoop/project-MD2/jobs/daily-statistic/lib/daily-statistic-0.1.jar hdfs://hadoop/project-MD2/jobs/daily-statistic/workflow.xml hdfs://hadoop/project-MD2/jobs/daily-statistic/coordinator.xml hdfs://hadoop/project-MD2/jobs/daily-statistic/sharelib
ここでは、原則として、 sharelib
ディレクトリを除き、コメントなしですべてが明確になります。 このディレクトリに、タスクの作成プロセスで使用されたすべてのライブラリを配置します。 私たちの場合、これらはすべてプロジェクトの依存関係で指定したSpark 2.0.0ライブラリです。 なぜこれが必要なのですか? 事実は、プロジェクトの依存関係で"provided"
を指定したことです。 これは、ビルドシステムがプロジェクトに依存関係を含める必要がなく、それらがスタートアップ環境によって提供されるが、世界が静止していないことを意味します。 タスクはこの更新の影響を受けやすいため、 sharelib
ディレクトリのライブラリセットを使用して実行します。 これがどのように構成されているかを以下に示します。
定期的に実行する
そして、すべてが発射のエキサイティングな瞬間に備えています。 コンソールからタスクを実行します。 起動時に、 xml
ファイルで使用したプロパティの値を設定する必要があります。 これらのプロパティを別のcoord.properties
ファイルにcoord.properties
ます。
# nameNode=hdfs://hadoop jobTracker=hadoop.host.ru:8032 # coordinator.xml oozie.coord.application.path=/project-MD2/jobs/daily-statistic # ( 24 ) frequency=1440 startTime=2017-09-01T07:00Z endTime=2099-09-01T07:00Z # workflow.xml workflowPath=/project-MD2/jobs/daily-statistic # , mapreduce.job.user.name=username user.name=username # dataDir=/project-MD2/data saveDir=/project-MD2/status jobDir=/project-MD2/jobs/daily-statistic # queue=PROJECTS # hdfs oozie.libpath=/project-MD2/jobs/daily-statistic/sharelib oozie.use.system.libpath=false
すごい、すべてがこする準備ができています。 次のコマンドで通常の実行を開始します。
oozie job -oozie http://hadoop.host.ru:11000/oozie -config coord.properties -run
開始後、タスクのjob_idがコンソールに表示されます。 このjob_idを使用すると、タスクのステータスに関する情報を確認できます。
oozie job -info {job_id}
タスクを停止します。
oozie job -kill {job_id}
job_idタスクがわからない場合は、ユーザーのすべての通常のタスクを表示して見つけることができます。
oozie jobs -jobtype coordinator -filter user={user_name}
おわりに
それは少し遅れたことが判明しましたが、私の意見では、詳細な指示はインターネットでのクエスト検索よりも優れています。 説明された経験があなたの役に立つことを願っています、あなたの注意に感謝します!