Scalaデヌタ分析-緊急のニヌズですか、それずも楜しい機䌚ですか


デヌタサむ゚ンスの分野における埓来のツヌルは、 RやPythonなどの蚀語です。リラックスした構文ず、機械孊習およびデヌタ凊理甚の倚数のラむブラリにより、いく぀かの実甚的な゜リュヌションをすばやく取埗できたす。 ただし、これらのツヌルの制限が重倧な障害ずなる状況がありたす。たず、凊理速床の点で高いパフォヌマンスを達成する必芁がある堎合や、非垞に倧きなデヌタセットを䜿甚する必芁がある堎合です。 この堎合、スペシャリストはしぶしぶ「ダヌクサむド」の助けを借りお、「産業甚」プログラミング蚀語 Scala 、 Java 、 C ++のツヌルを接続する必芁がありたす。


しかし、こちら偎はずおも暗いですか 長幎の開発を経お、「産業甚」デヌタサむ゚ンスのツヌルは倧きく進歩し、今日では2〜3幎前の独自のバヌゞョンずは倧きく異なりたす。 SNA Hackathon 2019タスクの䟋を䜿甚しお、Scala + Spark゚コシステムがPython Data Scienceにどれだけ察応できるかを考えおみたしょう。


SNA Hackathon 2019のフレヌムワヌク内で、参加者は、゜ヌシャルネットワヌクのナヌザヌのニュヌスフィヌドを、テキスト、画像、たたは機胜ログのデヌタを䜿甚する3぀の「分野」のいずれかに分類する問題を解決したす。 この出版物では、埓来の機械孊習ツヌルを䜿甚しお、Sparkで暙識のログに基づいお問題を解決する方法を説明したす。


問題を解決するには、モデルを開発するずきにデヌタ分析の専門家が経隓する暙準的な方法を䜿甚したす。



「旅」では、 Zeppelinむンタラクティブなノヌトブック、 Spark ML機械孊習ラむブラリ、その拡匵機胜PravdaML 、 GraphX グラフ䜜成パッケヌゞ、 Vegas芖芚化ラむブラリ、そしおもちろんApache Sparkなどのツヌルに粟通したす。  すべおのコヌドず実隓結果は、 Zeplコラボレヌティブノヌトパッドプラットフォヌムで利甚できたす 。


デヌタの読み蟌み


SNA Hackathon 2019でレむアりトされたデヌタの機胜は、Pythonを䜿甚しお盎接凊理できるこずですが、Apache Parquet列圢匏の機胜のおかげで゜ヌスデヌタは非垞に効率的に圧瞮され、メモリに「額で」読み蟌たれるず数十ギガバむトに圧瞮解陀されたす。 Apache Sparkを䜿甚する堎合、デヌタをメモリに完党にロヌドする必芁はありたせん。Sparkアヌキテクチャはデヌタを断片的に凊理し、必芁に応じおディスクからロヌドするように蚭蚈されおいたす。


したがっお、最初のステップ日ごずのデヌタ分垃の確認は、ボックス化されたツヌルで簡単に実行できたす。


val train = sqlContext.read.parquet("/events/hackatons/SNAHackathon/2019/collabTrain") z.show(train.groupBy($"date").agg( functions.count($"instanceId_userId").as("count"), functions.countDistinct($"instanceId_userId").as("users"), functions.countDistinct($"instanceId_objectId").as("objects"), functions.countDistinct($"metadata_ownerId").as("owners")) .orderBy("date")) 

察応するグラフが Zeppelinに衚瀺するもの



Scalaの構文は非垞に柔軟性があり、同じコヌドはたずえば次のように芋えるかもしれたせん。


 val train = sqlContext.read.parquet("/events/hackatons/SNAHackathon/2019/collabTrain") z.show( train groupBy $"date" agg( count($"instanceId_userId") as "count", countDistinct($"instanceId_userId") as "users", countDistinct($"instanceId_objectId") as "objects", countDistinct($"metadata_ownerId") as "owners") orderBy "date" ) 

ここで重芁な譊告を行う必芁がありたす。誰もが自分の奜みの芳点からのみScalaコヌドの䜜成に取り組む倧芏暡なチヌムで䜜業する堎合、コミュニケヌションははるかに困難です。 そのため、コヌドスタむルの統䞀された抂念を開発する方が適切です。


しかし、タスクに戻りたす。 日ごずの簡単な分析では、2月17日ず18日に異垞なポむントの存圚が瀺されたした。 おそらく最近では䞍完党なデヌタが収集されおおり、圢質の分垃は偏っおいる可胜性がありたす。 これは、さらに分析する際に考慮する必芁がありたす。 さらに、䞀意のナヌザヌの数がオブゞェクトの数に非垞に近いため、オブゞェクトの数が異なるナヌザヌの分垃を調査するこずは理にかなっおいたす。


 z.show(filteredTrain .groupBy($"instanceId_userId").count .groupBy("count").agg(functions.log(functions.count("count")).as("withCount")) .orderBy($"withCount".desc) .limit(100) .orderBy($"count")) 


非垞に長いテヌルを持぀指数関数に近い分垃が芋られるず予想されたす。 このようなタスクでは、原則ずしお、さたざたなレベルのアクティビティを持぀ナヌザヌのモデルをセグメント化するこずにより、䜜業の品質を向䞊させるこずができたす。 これを行う䟡倀があるかどうかを確認するには、テストセット内のナヌザヌごずのオブゞェクト数の分垃を比范したす。



テストずの比范は、テストナヌザヌがログに少なくずも2぀のオブゞェクトを持っおいるこずを瀺しおいたすランキングタスクはハッカ゜ンで解決されるため、これは品質を評䟡するための必芁条件です。 将来は、トレヌニングセットのナヌザヌをより詳しく調べるこずをお勧めしたす。トレヌニングセットでは、ナヌザヌ定矩関数をフィルタヌで宣蚀したす。


 //  ,     "",   , //     val testSimilar = sc.broadcast(filteredTrain.groupBy($"instanceId_userId") .agg( functions.count("feedback").as("count"), functions.sum(functions.expr("IF(array_contains(feedback, 'Liked'), 1.0, 0.0)")).as("sum") ) .where("count > sum AND sum > 0") .select("instanceId_userId").rdd.map(_.getInt(0)).collect.sorted) //           // User Defined Function val isTestSimilar = sqlContext.udf.register("isTestSimilar", (x: Int) => java.util.Arrays.binarySearch(testSimilar.value, x) >= 0) 

ここでも重芁な発蚀を行う必芁がありたす。Scala/ JavaずPythonでのSparkの䜿甚が著しく異なるのは、UDFを定矩するずいう芳点からです。 PySparkコヌドは基本的な機胜を䜿甚したすが、すべおがほが同じ速床で機胜したすが、オヌバヌラむドされた関数が衚瀺されるず、PySparkのパフォヌマンスは桁違いに䜎䞋したす。


最初のMLパむプラむン


次のステップでは、アクションず属性に関する基本的な統蚈の蚈算を詊みたす。 ただし、このためにはSparkMLの機胜が必芁なので、たずその䞀般的なアヌキテクチャを芋おいきたす。



SparkMLは、次の抂念に基づいお構築されおいたす。



MLアルゎリズムの圢成に察するこのようなアプロヌチは、明確なモゞュヌル構造ず優れた再珟性を実珟するのに圹立ちたす。モデルずパむプラむンの䞡方を節玄できたす。


たず、トレヌニングセットのナヌザヌのアクションの分垃フィヌドバックフィヌルドの統蚈を蚈算する単玔なパむプラむンを構築したす。


 val feedbackAggregator = new Pipeline().setStages(Array( //         (feedback)  one-hot  new MultinominalExtractor().setInputCol("feedback").setOutputCol("feedback"), //       new VectorStatCollector() .setGroupByColumns("date").setInputCol("feedback") .setPercentiles(Array(0.1,0.5,0.9)), //        new VectorExplode().setValueCol("feedback") )).fit(train) z.show(feedbackAggregator .transform(filteredTrain) .orderBy($"date", $"feedback")) 

このパむプラむンでは、 PravdaMLの機胜が積極的に䜿甚されおいたす。぀たり、SparkML甚の拡匵された䟿利なブロックを備えたラむブラリです。



䜜業の結果は、デヌタセット内のクラスのバランスが取れおいないこずを瀺すグラフになりたすが、タヌゲットLikedクラスの䞍均衡は極端ではありたせん。



テスト察象ログに「ポゞティブ」ず「ネガティブ」の䞡方があるに類䌌するナヌザヌ間の類䌌分垃の分析は、ポゞティブクラスに偏っおいるこずを瀺しおいたす。



兆候の統蚈分析


次の段階では、属性の統蚈特性の詳现な分析を実行したす。 今回は、より倧きなコンベアが必芁です。


 val statsAggregator = new Pipeline().setStages(Array( new NullToDefaultReplacer(), //          new AutoAssembler() .setColumnsToExclude( (Seq("date", "feedback") ++ train.schema.fieldNames.filter(_.endsWith("Id")) : _*)) .setOutputCol("features"), new VectorStatCollector() .setGroupByColumns("date").setInputCol("features") .setPercentiles(Array(0.1,0.5,0.9)), new VectorExplode().setValueCol("features") )) 

これから、個別のフィヌルドではなく、すべおの属性を䞀床に凊理する必芁があるため、さらに2぀の䟿利なPravdaMLナヌティリティを䜿甚したす。



結果のパむプラむンを䜿甚しお、3぀のセットトレヌニング、ナヌザヌフィルタヌずテストによるトレヌニングの統蚈を蚈算し、個別のファむルに保存したす。


 //   (   AutoAssembler  ) val trained = statsAggregator.fit(filteredTrain) //       - ,     . trained .transform(filteredTrain .withColumn("date", //  ,      ,     , //        All   functions.explode(functions.array(functions.lit("All"), $"date")))) .coalesce(7).write.mode("overwrite").parquet("sna2019/featuresStat") trained .transform(filteredTrain .where(isTestSimilar($"instanceId_userId")) .withColumn("date", functions.explode(functions.array(functions.lit("All"), $"date")))) .coalesce(7).write.mode("overwrite").parquet("sna2019/filteredFeaturesStat") trained .transform(filteredTest.withColumn("date", functions.explode(functions.array(functions.lit("All"), $"date")))) .coalesce(3).write.mode("overwrite").parquet("sna2019/testFeaturesStat") 

フィヌチャの統蚈情報を含む3぀のデヌタセットを受け取った埌、次のこずを分析したす。



これらの偎面を明確にするために、次のリク゚ストが圹立ちたす。


 def compareWithTest(data: DataFrame) : DataFrame = { data.where("date = 'All'") .select( $"features", //         // ( ) functions.log($"features_mean" / $"features_p50").as("skewenes"), //    90-      //    90-  —    functions.log( ($"features_max" - $"features_p90") / ($"features_p90" - $"features_p50")).as("outlieres"), //       ,  //    ($"features_nonZeros" / $"features_count").as("train_fill"), $"features_mean".as("train_mean")) .join(testStat.where("date = 'All'") .select($"features", $"features_mean".as("test_mean"), ($"features_nonZeros" / $"features_count").as("test_fill")), Seq("features")) //          .withColumn("meanDrift", (($"train_mean" - $"test_mean" ) / ($"train_mean" + $"test_mean"))) //      .withColumn("fillDrift", ($"train_fill" - $"test_fill") / ($"train_fill" + $"test_fill")) } //         val comparison = compareWithTest(trainStat).withColumn("mode", functions.lit("raw")) .unionByName(compareWithTest(filteredStat).withColumn("mode", functions.lit("filtered"))) 

この段階では、芖芚化の問題は緊急です。ツェッペリンの通垞のツヌルを䜿甚するず、すべおの偎面をすぐに衚瀺するこずは難しく、膚倧なグラフを含むノヌトブックは肥倧化したDOMにより著しく遅くなり始めたす。 Vegas - vega-lite仕様を䜜成するためのScalaのDSLラむブラリは、この問題を解決できたす。 Vegasは、豊富な芖芚化機胜matplotlibず同等を提䟛するだけでなく、DOMを拡匵するこずなくCanvasに描画したす:)。


興味のあるチャヌトの仕様は次のようになりたす。


 vegas.Vegas(width = 1024, height = 648) //   .withDataFrame(comparison.na.fill(0.0)) //           .encodeX("meanDrift", Quant, scale = Scale(domainValues = List(-1.0, 1.0), clamp = true)) //   -       .encodeY("train_fill", Quant) //       .encodeColor("outlieres", Quant, scale=Scale( rangeNominals=List("#00FF00", "#FF0000"), domainValues = List(0.0, 5), clamp = true)) //       .encodeSize("skewenes", Quant) //   -   (   ) .encodeShape("mode", Nom) .mark(vegas.Point) .show 

以䞋のチャヌトは次のようになりたす。




したがっお、次の結論を導き出すこずができたす。



盞関分析


属性がどのように分散され、トレヌニングセットずテストセットの間でどのように関係するかに぀いおの䞀般的なアむデアを埗た埌、盞関を分析しおみたしょう。 これを行うには、以前の芳枬に基づいお特城抜出を構成したす。


 //             val expressions = filteredTrain.schema.fieldNames //          .filterNot(x => x == "date" || x == "audit_experiment" || idsColumns(x) || x.contains("vd_")) .map(x => if(skewedFeautres(x)) { //      s"log($x) AS $x" } else { //     cappedFeatures.get(x).map(capping => s"IF($x < $capping, $x, $capping) AS $x").getOrElse(x) }) val rawFeaturesExtractor = new Pipeline().setStages(Array( new SQLTransformer().setStatement(s"SELECT ${expressions.mkString(", ")} FROM __THIS__"), new NullToDefaultReplacer(), new AutoAssembler().setOutputCol("features") )) //       val raw = rawFeaturesExtractor.fit(filteredTrain).transform( filteredTrain.where(isTestSimilar($"instanceId_userId"))) 

このパむプラむンの新しい機械のうち、入力テヌブルの任意のSQL倉換を可胜にするSQLTransformerナヌティリティは泚目に倀したす。


盞関を分析するずきは、ワンホットフィヌチャの自然な盞関によっお䜜成されたノむズを陀倖するこずが重芁です。 このため、ベクトルのどの芁玠がどの初期列に察応するかを理解したいず思いたす。 Sparkでのこのタスクは、列メタデヌタデヌタず共に保存ず属性グルヌプを䜿甚しお実行されたす。 次のコヌドブロックは、String型の同じ列に由来する属性名のペアを陀倖するために䜿甚されたす。


 val attributes = AttributeGroup.fromStructField(raw.schema("features")).attributes.get val originMap = filteredTrain .schema.filter(_.dataType == StringType) .flatMap(x => attributes.map(_.name.get).filter(_.startsWith(x.name + "_")).map(_ -> x.name)) .toMap //   ,          val isNonTrivialCorrelation = sqlContext.udf.register("isNonTrivialCorrelation", (x: String, y : String) => //    Scala-quiz   Option originMap.get(x).map(_ != originMap.getOrElse(y, "")).getOrElse(true)) 

ベクトル列を持぀デヌタセットを手元に眮いお、Sparkを䜿甚しお盞互盞関を蚈算するのは非垞に簡単ですが、結果はマトリックスになりたす。展開のために、ペアのセットを少し再生する必芁がありたす。


 val pearsonCorrelation = //    Pearson  Spearman Correlation.corr(raw, "features", "pearson").rdd.flatMap( //           _.getAs[Matrix](0).rowIter.zipWithIndex.flatMap(x => { //   ,   (  , //  ) val name = attributes(x._2).name.get //    ,     x._1.toArray.zip(attributes).map(y => (name, y._2.name.get, y._1)) } //     DataFrame )).toDF("feature1", "feature2", "corr") .na.drop //   .where(isNonTrivialCorrelation($"feature1", $"feature2")) //    . pearsonCorrelation.coalesce(1).write.mode("overwrite") .parquet("sna2019/pearsonCorrelation") 

そしお、もちろん、芖芚化繰り返したすが、ヒヌトマップを描くにはVegasの助けが必芁です。


 vegas.Vegas("Pearson correlation heatmap") .withDataFrame(pearsonCorrelation .withColumn("isPositive", $"corr" > 0) .withColumn("abs_corr", functions.abs($"corr")) .where("feature1 < feature2 AND abs_corr > 0.05") .orderBy("feature1", "feature2")) .encodeX("feature1", Nom) .encodeY("feature2", Nom) .encodeColor("abs_corr", Quant, scale=Scale(rangeNominals=List("#FFFFFF", "#FF0000"))) .encodeShape("isPositive", Nom) .mark(vegas.Point) .show 

結果はZepl-eで芋る方が良いです。 䞀般的な理解のために



ヒヌトマップは、いく぀かの盞関関係が明らかに利甚できるこずを瀺しおいたす。 最も匷く盞関する特城のブロックを遞択しおみたしょう。これには、 GraphXラむブラリを䜿甚したす盞関行列をグラフに倉換し、重みで゚ッゞをフィルタヌ凊理したす。その埌、接続されたコンポヌネントを芋぀け、非劣化コンポヌネントのみを残したす耇数の芁玠から。 このような手順は、 DBSCANアルゎリズムのアプリケヌションに本質的に類䌌しおおり、次のずおりです。


 //   (GrpahX   ID) val featureIndexMap = spearmanCorrelation.select("feature1").distinct.rdd.map( _.getString(0)).collect.zipWithIndex.toMap val featureIndex = sqlContext.udf.register("featureIndex", (x: String) => featureIndexMap(x)) //    val vertices = sc.parallelize(featureIndexMap.map(x => x._2.toLong -> x._1).toSeq, 1) //    val edges = spearmanCorrelation.select(featureIndex($"feature1"), featureIndex($"feature2"), $"corr") //     .where("ABS(corr) > 0.7") .rdd.map(r => Edge(r.getInt(0), r.getInt(1), r.getDouble(2))) //       val components = Graph(vertices, edges).connectedComponents() val reversedMap = featureIndexMap.map(_.swap) //    ,    ,   //   val clusters = components .vertices.map(x => reversedMap(x._2.toInt) -> reversedMap(x._1.toInt)) .groupByKey().map(x => x._2.toSeq) .filter(_.size > 1) .sortBy(-_.size) .collect 

結果は衚圢匏で衚瀺されたす。



クラスタリングの結果に基づいお、最も盞関性の高いグルヌプは、グルヌプのメンバヌシップmembership_status_AずオブゞェクトのタむプinstanceId_objectTypeに関連付けられた蚘号の呚りに圢成されたず結論付けるこずができたす。 暙識の盞互䜜甚の最適なモデリングのために、モデルのセグメンテヌションを適甚するこずは理にかなっおいたす-ナヌザヌが存圚するグルヌプずそうでないグルヌプに別々に異なるタむプのオブゞェクトの異なるモデルをトレヌニングするため。


機械孊習


最も興味深いのは機械孊習です。 SparkMLおよびPravdaML拡匵機胜を䜿甚しお最も単玔なモデルロゞスティック回垰をトレヌニングするためのパむプラむンは次のずおりです。


  new Pipeline().setStages(Array( new SQLTransformer().setStatement( """SELECT *, IF(array_contains(feedback, 'Liked'), 1.0, 0.0) AS label FROM __THIS__"""), new NullToDefaultReplacer(), new AutoAssembler() .setColumnsToExclude("date", "instanceId_userId", "instanceId_objectId", "feedback", "label") .setOutputCol("features"), Scaler.scale(Interceptor.intercept(UnwrappedStage.repartition( new LogisticRegressionLBFSG(), numPartitions = 127))) 

ここでは、倚くの銎染みのある芁玠だけでなく、いく぀かの新しい芁玠も確認できたす。



結果のパむプラむンは、すべおのデヌタに適甚され、ナヌザヌごずのAUC 0.6889を提䟛したす怜蚌コヌドはZeplで利甚可胜です。 デヌタのフィルタヌ凊理、機胜の倉換、セグメントモデルのすべおの研究を適甚するこずは今でも残っおいたす。 最終的なパむプラむンは次のようになりたす。


  new Pipeline().setStages(Array( new SQLTransformer().setStatement(s"SELECT instanceId_userId, instanceId_objectId, ${expressions.mkString(", ")} FROM __THIS__"), new SQLTransformer().setStatement("""SELECT *, IF(array_contains(feedback, 'Liked'), 1.0, 0.0) AS label, concat(IF(membership_status = 'A', 'OwnGroup_', 'NonUser_'), instanceId_objectType) AS type FROM __THIS__"""), new NullToDefaultReplacer(), new AutoAssembler() .setColumnsToExclude("date", "instanceId_userId", "instanceId_objectId", "feedback", "label", "type","instanceId_objectType") .setOutputCol("features"), CombinedModel.perType( Scaler.scale(Interceptor.intercept(UnwrappedStage.repartition( new LogisticRegressionLBFSG(), numPartitions = 127))), numThreads = 6) )) 

PravdaML — CombinedModel.perType. , numThreads = 6. .


, , per-user AUC 0.7004. ? , " " XGBoost :


 new Pipeline().setStages(Array( new SQLTransformer().setStatement("""SELECT *, IF(array_contains(feedback, 'Liked'), 1.0, 0.0) AS label FROM __THIS__"""), new NullToDefaultReplacer(), new AutoAssembler() .setColumnsToExclude("date", "instanceId_userId", "instanceId_objectId", "feedback", "label") .setOutputCol("features"), new XGBoostRegressor() .setNumRounds(100) .setMaxDepth(15) .setObjective("reg:logistic") .setNumWorkers(17) .setNthread(4) .setTrackerConf(600000L, "scala") )) 

, — XGBoost Spark ! DLMC , PravdaML , ( ). XGboost " " 10 per-user AUC 0.6981.


結果分析


, , , . SparkML , . PravdaML : Parquet Spark:


 //     val perTypeWeights = sqlContext.read.parquet("sna2019/perType/stages/*/weights") //     20    ( //  ) val topFeatures = new TopKTransformer[Double]() .setGroupByColumns("type") .setColumnToOrderGroupsBy("abs_weight") .setTopK(20) .transform(perTypeWeights.withColumn("abs_weight", functions.abs($"unscaled_weight"))) .orderBy("type", "unscaled_weight") 

Parquet, PravdaML — TopKTransformer, .


Vegas ( Zepl ):



, - . XGBoost?


 val significance = sqlContext.read.parquet( "sna2019/xgBoost15_100_raw/stages/*/featuresSignificance" vegas.Vegas() .withDataFrame(significance.na.drop.orderBy($"significance".desc).limit(40)) .encodeX("name", Nom, sortField = Sort("significance", AggOps.Mean)) .encodeY("significance", Quant) .mark(vegas.Bar) .show 


, , XGBoost, , . . , XGBoost , , .


結論


, :). :


  1. , Scala Spark , , , , .
  2. Scala Spark Python: ETL ML, , , .
  3. , , , (, ) , , .
  4. , , . , , , -, .

, , , , -. , , " Scala " Newprolab.


, , — SNA Hackathon 2019 .



Source: https://habr.com/ru/post/J442688/


All Articles