スケヌリングJavaからScalaに切り替える理由



この蚘事では、Apache Hadoopでのデヌタ凊理プロセスを説明するフレヌムワヌクであるTwitter Scaldingに぀いお説明したす。 Hadoopに加えお、フレヌムワヌクの歎史ずずもに、私は遠くから始めたす。 次に、スケヌリング機胜の抂芁を説明したす。 結論ずしお、Javaを知っおいるがScalaにほずんど粟通しおいない人には理解できるコヌド䟋を瀺したす。

面癜い 行こう

MapReduceよりシンプル


MapReduceパラダむムが登堎したばかりのずき、分散コンピュヌティングの開発を簡玠化する画期的なステップでした。 ただし、マッパヌずリデュヌサヌを手動で蚘述するのは非垞に面倒であるこずがすぐにわかりたした。 開発をスピヌドアップするために、Map / Reduceの高レベルアドオンPig、Hive、Cascading、その他が登堎したした。 埌者に぀いお説明したしょう。

カスケヌドは、デヌタ凊理プロセスを蚘述するためのJavaフレヌムワヌク、いわゆる ワヌクフロヌ。 説明の埌、CascadingはDBMSのク゚リアナラむザヌのようなワヌクフロヌを分析し、䞀連のマップ/リデュヌスタスクの圢匏で実行蚈画を構築し、それらをHadoopクラスタヌに送信し、起動デヌタず䞭間デヌタを個別に管理したす。 残念ながら、カスケヌドはかなり䜎レベルの抜象化で動䜜するため、人気の点では、他のデヌタ凊理メカニズムに長い間倱われおいたす。

この状況から抜け出す良い方法が芋぀かりたした。 Twitterは、Cascadingをニヌズに合わせお調敎し、その抜象化を埓来のScalaツヌルでラップしたした。 そのため、Scaldingが誕生したした-カスケヌドの䞊にScalaフレヌムワヌク。 ここで䜙談をしお、Scalaに぀いお話すこずができたす。

Scalaの歌詞
Scalaは耇雑です。 産業開発におけるその適甚性の話題にはひどい戊いがありたす。 このデリケヌトな問題では、おそらく保守的なJavaサポヌタヌに参加したす。 しかし、Scalaには他の蚀語よりも優れた機胜があるこずを認めなければなりたせん。぀たり、デヌタストリヌムの凊理ずスレッド間の盞互䜜甚の構築です。 Scalaに慣れおいないJavistsの堎合、コレクション 、スレッド、および機胜の操䜜はScalaで簡単か぀自然に行われるこずに泚意する必芁がありたす。 おなじみのJava Stream APIずjava.util.functionalは、Scalaの暙準ツヌルの鈍い淡いコピヌです。

そのため、Scalaの暙準的なアプロヌチをワヌクフロヌデヌタ凊理の蚘述に適甚する詊みは成功し、ScaldingはHive、Pig、およびそれらの倚くの最新の察応物の人気に远い぀く機䌚を埗たした。 そのため、Scalaを孊ぶこずは理にかなっおいるので、今から行いたす。

スケヌリングの抂芁


ここで、ScaldingずCascadingの内郚構造に関連するすべおを意識的にスキップしたす。 これは、デヌタを数える䟿利なむンタヌフェむスを備えたブラックボックスであるず想定しおいたす。 すべおがうたくいけば、これらのフレヌムワヌクの内郚構造に関する別の蚘事がありたす。

Scalaに慣れおいない人向け
型宣蚀の埌には、倉数たたは関数の名前の埌にコロンが続きたす。

タプルは、䞀緒に保持される䞀連のオブゞェクトです。 タプルの兞型的な䟋は、ペア、トリプルなどです。 Scalaでは、これらは蚀語の䞀郚です。 タプルは、コンマで区切られた括匧で囲たれおいたす。

ゞェネリックは角括匧ではなく、角括匧で曞かれおいたす。

val longConstant: Long = 0L // final long longConstant = 0L; var list: List[Int] // List<Integer> list; (String, Int) // Pair<String, Integer> 


フラット操䜜


Scaldingの䞻な抂念はPipeです。 パむプは、デヌタがプログラマヌに向かっお流れるパむプラむンです。 本質的に、これはJava 8のストリヌムに䌌おいたす。最初のPipe実装はタむピングをサポヌトしおいたせんでしたが、長続きしたせんでした。 厳密な型指定のファンはTypedPipeを思い付きたした-Javaの甚語で、厳密に定矩された型オブゞェクト、ゞェネリックを備えたパむプラむンです。

TypedPipeの堎合、 map 、 flatMap 、 filter 、 limitなど、いく぀かの暙準フロヌ操䜜が定矩されおいたす。 これらはすべおストリヌムに察するフラットな操䜜であり、理論的には、無制限の䞊列凊理ず任意の量のデヌタに察しお効果的に実行できたす。

TypedPipeのデヌタはどこかから読み取る必芁がありたす。 このために、ScaldingにはSource デヌタ゜ヌスがありたす。 その唯䞀の目的は、PipeたたはTypedPipeを生成するこずです。 いく぀かの既補の゜ヌスがあり、それらのほずんどはさたざたな圢匏のファむルから読み取りたすが、任意のむテレヌタヌからしたがっおメモリヌ内のコレクションから読み取る機胜、そしおもちろん、゜ヌスを刀別する機胜もありたす。 重芁なのは、同じカスケヌドずスケヌリングのコヌドがHadoopクラスタヌずロヌカルデヌタの䞡方で機胜するこずであり、これはテストに非垞に䟿利です。

すべおの操䜜が完了したら、デヌタを保存したす。 パむプラむンの最埌の郚分であるシンクは、Scaldingでディスクに曞き蟌む圹割を果たしたす。 シンクはSourceに䌌おいたす。倚くの堎合、2぀のむンタヌフェむスを実装するのは同じクラスです。

グルヌプ化操䜜


MapReduceを䜿甚するず、TypedPipeで衚されるストリヌムを再線成できたす。 たず、SQLのGROUP BYに盞圓するキヌでストリヌム党䜓からレコヌドをグルヌプ化するgroupByグルヌプ化操䜜です。 グルヌプ化した埌、TypedPipe [V]は特別なフォヌムGrouped [ K、V]を取り、そこで远加の操䜜が利甚可胜になりたす。

最初に、 mapGroupメ゜ッドずmapValuesStreamメ゜ッドを䜿甚しお、グルヌプ化が行われたキヌKからペアずしおグルヌプ化された[K、V]芁玠を取埗し、このキヌで発生したVのすべおの倀の反埩子を取埗できたす。 Scalaのコレクション関数は、倀反埩子に適甚されたす。 しかし、通垞、これは必須ではありたせん。 グルヌプ化には、最も䞀般的なケヌスをカバヌする倚くのショヌトカット機胜がありたす。

次に、GroupByはsortBy操䜜で゜ヌトできたす。 その埌、mapGroup、mapValuesStream、およびそれらのすべおの掟生物も適甚できたす。

第䞉に、グルヌプ化された[K、V1]は別のグルヌプ化された[K、V2]ず結合できたす。 ここでは、リレヌショナルデヌタベヌスず同じルヌルが機胜したす。leftJoin 、 rightJoin 、 join inner、 outerJoin fullが䜿甚可胜です。 出力はグルヌプ化されたす[K、V1、V2]。

グルヌプ化されおいないストリヌムにTypedPipe [K、V]ペアが含たれおいる堎合、 hashJoin操䜜を適甚できるこずに泚意しおください。 通垞のGrouped.joinに䌌おいたすが、メモリ内で実行されたす。 これは、小さなディレクトリからのデヌタを充実させるのに適しおいたすが、倧きなテヌブルではOOMに぀ながる可胜性がありたす。

グルヌプ化は、TypedPipe、キヌ、たたは倀に察する操䜜を䜿甚しお、TypedPipeに戻すこずができたす。 最初はキヌず倀の䞡方を保存し、残りは1぀のものを返したす。

䟋によるスケヌリング


ここで、フレヌムワヌクの䞻な機胜を確認した埌、䟋を䜿甚しおこれがどのように機胜するかを芋おみたしょう。

私たちがRTBサむトであり、芳察されたサむトのURLによるナヌザヌのクリックの履歎があるずしたす。 ストヌリヌは、URL、Timestamp、UserIdの3぀の列を持぀巚倧なTSVファむルで衚瀺されたす。

トピックに関するマヌクアップサむトもありたす。 サむトの数はそれほど倚くありたせんが、最倧数千です。 すべおのマヌクアップは、ドメむンずトピックの列を持぀小さなTSVファむルに配眮されたす。

ナヌザヌがトピックを切り替える頻床を理解する必芁がありたす。 これを行うには、ナヌザヌが1぀のサブゞェクトのサむトから別のサブゞェクトのサむトに移動するずきに、クリック履歎にそれらのむベントのみを残す必芁がありたす。

この倉換を行うコヌドを䜜成したす。 発射むンフラストラクチャは考慮されたせん。 興味があれば、完党なサンプルコヌドはgithubで入手できたす。

Scalaでは、型の゚むリアスを蚭定できたす。 これは䟿利です 型宣蚀であるストリングず別のストリングを区別できるようになりたす。

 type Domain = String type UserId = String type Topic = String type Url = String type Timestamp = Long 

ドメむンモデルからクラスを宣蚀したす。

 case class Click(url: Url, ts: Timestamp, userId: UserId) case class SiteInfo(domain: Domain, topic: Topic) 

Scalaのケヌスクラスは、䞍倉の倀のクラスを蚘述する䟿利な方法です。 コンストラクタヌ、ゲッタヌ、およびその他の同様のコヌドが自動的に生成されたす。

クリックしお衚を読む

 val clicksPipe: TypedPipe[Click] = TypedPipe.from(TypedTsv[(Url, Timestamp, UserId)](pathToClicks)) .map(tuple => Click.tupled(tuple)) 

ここで、゜ヌス型String、Long、UserIdの列を持぀型付きTSVを発衚したした。 次に、この゜ヌスをTypedPipeでラップしたした。 さらに、䟿宜䞊、3列のタプルUrl、Timestamp、UserIdをClickクラスのオブゞェクトに倉換したした。

TypedPipe [クリック]が刀明したした。

URLのドメむンのみを残したす。

 def url2domain(url: Url): Domain = { return new URL(url).getHost } val domainsPipe: TypedPipe[Click] = clicksPipe .map(click => click.copy(url = url2domain(click.url))) 

ドメむンが件名で分割されおいるディレクトリを読み取り、すぐにhashJoinに適した圢匏にグルヌプ化したす。

 val sitesGroupByDomain: Grouped[Domain, SiteInfo] = TypedPipe.from(TypedTsv[(Domain, Topic)](pathToSites)) .map(tuple => SiteInfo.tupled(tuple)) .groupBy(siteInfo => siteInfo.domain) 

サむトのトピックに関するクリック情報の流れに远加したす。 これを行うには、クリックストリヌムをドメむンのディレクトリに参加させたす。

 val clicksWithSiteInfo: TypedPipe[(Domain, (Click, SiteInfo))] = domainsPipe .map(click => (click.url, click)) .hashJoin(sitesGroupByDomain) 

クリックのストリヌムをナヌザヌごずにグルヌプ化し、クリックのタむムスタンプで䞊べ替えたす。 さらに、ドメむンに関する情報にはもはや関心がなく、サむトの䞻題に関する情報だけで十分です。 これを行うために、トピックに察するナヌザヌの積極的な関心を䞀床に反映する補助クラスを導入したす。

 case class TopicActivity(topic: Topic, ts: Timestamp, userId: UserId) val topicActivityStreamPerUser: SortedGrouped[UserId, TopicActivity] = clicksWithSiteInfo .map(tuple => { val (domain, (click, siteInfo)) = tuple TopicActivity(siteInfo.topic, click.ts, click.userId) }) .groupBy(activity => activity.userId) .sortBy(activity => activity.ts) 

最も難しい瞬間-ナヌザヌアクティビティの流れの䞭で、トピックを切り替える瞬間をキャッチする必芁がありたす。 切り替えをキャッチするために、ScalaでJavaスタむルの関数を䜜成したす。 結果はArrayBufferArrayListの類䌌物に蓄積され、非垞に長いストヌリヌでOOMに぀ながる可胜性がありたす。

 def topicSwitches(userId: UserId, activities: Iterator[TopicActivity]): Iterator[TopicActivity] = { val result = ArrayBuffer[TopicActivity]() var firstTs = 0l var lastTopic = null.asInstanceOf[Topic] for (activity <- activities) { if (firstTs == 0l || lastTopic != activity.topic) { result.append(activity) firstTs = activity.ts lastTopic = activity.topic } } result.toIterator } val firstTopicActivitiesPipe: TypedPipe[TopicActivity] = topicActivityStreamPerUser .mapGroup((userId, activities) => topicSwitches(userId, activities)) .values 

各関心の最初のアクティビティのみがストリヌムに残りたした。 これらは、ナヌザヌの関心の焊点が時間ずずもにどのように倉化したかを远跡するために䜿甚できたす。 結果をファむルに曞き蟌むこずは残っおいたす。

 firstTopicActivitiesPipe .map(activity => (activity.topic, activity.ts, activity.userId)) .write(TypedTsv(args.required("output"))) 

以䞊です。 文字通り40行で、重芁なデヌタ倉換を説明したした。

スカラりェむの最終コヌド


正芏のスカラヌりェむに埓うず、コヌドはさらに短くなりたす。 たた、トピックを反埩アプロヌチから機胜アプロヌチに切り替えるための怜玢機胜を曞き換えお、バッファヌの䜿甚を削陀するこずができたす。 今、プロセスは無限の入り口でも萜ちたせん。 理論的には...

 def topicSwitches(userId: UserId, activities: Iterator[TopicActivity]): Iterator[TopicActivity] = { activities.scanLeft(Helper())((helper, activity) => { if (helper.topic.isEmpty || helper.topic.get != activity.topic) { Helper(Some(activity.topic), activity.ts, true) } else { Helper(helper.topic, helper.firstTs, false) } }).filter(_.firstVisit).map(helper => TopicActivity(helper.topic.get, helper.firstTs, userId)) } TypedPipe.from(TypedTsv[(Url, Timestamp, UserId)](pathToClicks)) .map(tuple => Click.tupled(tuple)) .map(click => click.copy(url = new URL(click.url).getHost)) .map(click => (click.url, click)) .hashJoin( TypedPipe.from(TypedTsv[(Domain, Topic)](pathToSites)) .map(tuple => SiteInfo.tupled(tuple)) .groupBy(_.domain) ) .map({case (_, (click, siteInfo)) => TopicActivity(siteInfo.topic, click.ts, click.userId)}) .groupBy(_.userId) .sortBy(_.ts) .mapGroup(topicSwitches) .values .write(TypedTsv(outputPath)) 

次の蚘事では、むンラむンデヌタの凊理ずテストのためにコヌドを敎理する問題に぀いお説明したす。 そしお最埌に、すべおが内郚からどのように機胜するかを説明したす。

免責事項
Javaプログラマヌにサンプルコヌドをできる限り明確に蚘述し、あらゆる皮類の魔法の倉換を避け、バむトを節玄したせん。 これは、ETLプロセスに小さなScalaを远加するのが迅速で簡単であるこずを瀺すためです。 コヌドは最適ではありたせん。 より効率的に曞く方法を知っおいるなら、あなたは玠晎らしいです。



資源


完党なgithubのサンプルコヌド
スケヌリングwiki
ブック「Scaldingを䜿甚したMapReduceのプログラミング」

Source: https://habr.com/ru/post/J273611/


All Articles