http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#a9a
$ wget http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/a9a $ wget http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/a9a.t
scala> :paste val rawTrainDf = spark.read.format("libsvm").load("a9a") val (max, min) = rawTrainDf.select(max($"label"), min($"label")).collect.map { case Row(max: Double, min: Double) => (max, min) } val trainDf = rawTrainDf.select( // `label` must be [0.0, 1.0] rescale($"label", lit(min), lit(max)).as("label"), $"features" ) scala> trainDf.printSchema root |-- label: float (nullable = true) |-- features: vector (nullable = true) scala> :paste val testDf = spark.read.format("libsvm").load("a9a.t") .select(rowid(), rescale($"label", lit(min), lit(max)).as("label"), $"features") .explode_vector($"features") .select($"rowid", $"label".as("target"), $"feature", $"weight".as("value")) .cache scala> testDf.printSchema root |-- rowid: string (nullable = true) |-- target: float (nullable = true) |-- feature: string (nullable = true) |-- value: double (nullable = true)
#Training
scala> :paste val modelDf = trainDf .train_logregr(append_bias($"features"), $"label") .groupBy("feature").avg("weight") .toDF("feature", "weight") .cache
#Test
scala> :paste val predictDf = testDf .join(modelDf, testDf("feature") === modelDf("feature"), "LEFT_OUTER") .select($"rowid", ($"weight" * $"value").as("value")) .groupBy("rowid").sum("value") .select( $"rowid", when(sigmoid($"sum(value)") > 0.5, 1.0).otherwise(0.0).as("predicted") )
#Evaluation
scala> val df = predictDf.join(testDf, predictDf("rowid").as("id") === testDf("rowid"), "INNER") scala> (df.where($"target" === $"predicted").count + 0.0) / df.count Double = 0.8327921286841418