blob: 205c8c3ee32fd4e5f47cb1fcdd9f1445b696ce67 [file] [log] [blame]
package org.template.classification
import io.prediction.controller.P2LAlgorithm
import io.prediction.controller.Params
import org.apache.spark.mllib.classification.NaiveBayes
import org.apache.spark.mllib.classification.NaiveBayesModel
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.SparkContext
import grizzled.slf4j.Logger
case class AlgorithmParams(
lambda: Double
) extends Params
// extends P2LAlgorithm because the MLlib's NaiveBayesModel doesn't contain RDD.
class NaiveBayesAlgorithm(val ap: AlgorithmParams)
extends P2LAlgorithm[PreparedData, NaiveBayesModel, Query, PredictedResult] {
@transient lazy val logger = Logger[this.type]
def train(sc: SparkContext, data: PreparedData): NaiveBayesModel = {
// MLLib NaiveBayes cannot handle empty training data.
require(!data.labeledPoints.take(1).isEmpty,
s"RDD[labeldPoints] in PreparedData cannot be empty." +
" Please check if DataSource generates TrainingData" +
" and Preprator generates PreparedData correctly.")
NaiveBayes.train(data.labeledPoints, ap.lambda)
}
def predict(model: NaiveBayesModel, query: Query): PredictedResult = {
val label = model.predict(Vectors.dense(query.features))
new PredictedResult(label)
}
}