blob: 5723bc1e9888f83224134bf10d90e0664b515779 [file] [log] [blame]
package org.example.classification
import org.apache.predictionio.controller.P2LAlgorithm
import org.apache.predictionio.controller.Params
import org.apache.spark.mllib.classification.NaiveBayes
import org.apache.spark.mllib.classification.NaiveBayesModel
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.SparkContext
import grizzled.slf4j.Logger
case class AlgorithmParams(
lambda: Double
) extends Params
// extends P2LAlgorithm because the MLlib's NaiveBayesModel doesn't contain RDD.
class NaiveBayesAlgorithm(val ap: AlgorithmParams)
extends P2LAlgorithm[PreparedData, NaiveBayesModel, Query, PredictedResult] {
@transient lazy val logger = Logger[this.type]
def train(sc: SparkContext, data: PreparedData): NaiveBayesModel = {
// MLLib NaiveBayes cannot handle empty training data.
require(data.labeledPoints.take(1).nonEmpty,
s"RDD[labeledPoints] in PreparedData cannot be empty." +
" Please check if DataSource generates TrainingData" +
" and Preparator generates PreparedData correctly.")
NaiveBayes.train(data.labeledPoints, ap.lambda)
}
def predict(model: NaiveBayesModel, query: Query): PredictedResult = {
val label = model.predict(Vectors.dense(
Array(query.attr0, query.attr1, query.attr2)
))
new PredictedResult(label)
}
}