blob: 3b5f3632dc7d9e8cd9996d6c8941bd63874ff459 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.predictionio.controller
import grizzled.slf4j.Logger
import org.apache.predictionio.core.BaseAlgorithm
import org.apache.predictionio.core.BaseDataSource
import org.apache.predictionio.core.BaseEngine
import org.apache.predictionio.core.BasePreparator
import org.apache.predictionio.core.BaseServing
import org.apache.predictionio.core.Doer
import org.apache.predictionio.data.storage.EngineInstance
import org.apache.predictionio.data.storage.StorageClientException
import org.apache.predictionio.workflow.CreateWorkflow
import org.apache.predictionio.workflow.EngineLanguage
import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
import org.apache.predictionio.workflow.NameParamsSerializer
import org.apache.predictionio.workflow.PersistentModelManifest
import org.apache.predictionio.workflow.SparkWorkflowUtils
import org.apache.predictionio.workflow.StopAfterPrepareInterruption
import org.apache.predictionio.workflow.StopAfterReadInterruption
import org.apache.predictionio.workflow.WorkflowParams
import org.apache.predictionio.workflow.WorkflowUtils
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.read
import scala.collection.JavaConversions
import scala.language.implicitConversions
/** This class chains up the entire data process. PredictionIO uses this
* information to create workflows and deployments. In Scala, you should
* implement an object that extends the [[EngineFactory]] trait similar to the
* following example.
*
* {{{
* object ItemRankEngine extends EngineFactory {
* def apply() = {
* new Engine(
* classOf[ItemRankDataSource],
* classOf[ItemRankPreparator],
* Map(
* "knn" -> classOf[KNNAlgorithm],
* "rand" -> classOf[RandomAlgorithm],
* "mahoutItemBased" -> classOf[MahoutItemBasedAlgorithm]),
* classOf[ItemRankServing])
* }
* }
* }}}
*
* @see [[EngineFactory]]
* @tparam TD Training data class.
* @tparam EI Evaluation info class.
* @tparam PD Prepared data class.
* @tparam Q Input query class.
* @tparam P Output prediction class.
* @tparam A Actual value class.
* @param dataSourceClassMap Map of data source names to class.
* @param preparatorClassMap Map of preparator names to class.
* @param algorithmClassMap Map of algorithm names to classes.
* @param servingClassMap Map of serving names to class.
* @group Engine
*/
class Engine[TD, EI, PD, Q, P, A](
val dataSourceClassMap: Map[String,
Class[_ <: BaseDataSource[TD, EI, Q, A]]],
val preparatorClassMap: Map[String, Class[_ <: BasePreparator[TD, PD]]],
val algorithmClassMap: Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]],
val servingClassMap: Map[String, Class[_ <: BaseServing[Q, P]]])
extends BaseEngine[EI, Q, P, A] {
private[predictionio]
implicit lazy val formats = Utils.json4sDefaultFormats +
new NameParamsSerializer
@transient lazy protected val logger = Logger[this.type]
/** This auxiliary constructor is provided for backward compatibility.
*
* @param dataSourceClass Data source class.
* @param preparatorClass Preparator class.
* @param algorithmClassMap Map of algorithm names to classes.
* @param servingClass Serving class.
*/
def this(
dataSourceClass: Class[_ <: BaseDataSource[TD, EI, Q, A]],
preparatorClass: Class[_ <: BasePreparator[TD, PD]],
algorithmClassMap: Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]],
servingClass: Class[_ <: BaseServing[Q, P]]) = this(
Map("" -> dataSourceClass),
Map("" -> preparatorClass),
algorithmClassMap,
Map("" -> servingClass)
)
/** Java-friendly constructor
*
* @param dataSourceClass Data source class.
* @param preparatorClass Preparator class.
* @param algorithmClassMap Map of algorithm names to classes.
* @param servingClass Serving class.
*/
def this(dataSourceClass: Class[_ <: BaseDataSource[TD, EI, Q, A]],
preparatorClass: Class[_ <: BasePreparator[TD, PD]],
algorithmClassMap: _root_.java.util.Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]],
servingClass: Class[_ <: BaseServing[Q, P]]) = this(
Map("" -> dataSourceClass),
Map("" -> preparatorClass),
JavaConversions.mapAsScalaMap(algorithmClassMap).toMap,
Map("" -> servingClass)
)
/** Returns a new Engine instance, mimicking case class's copy method behavior.
*/
def copy(
dataSourceClassMap: Map[String, Class[_ <: BaseDataSource[TD, EI, Q, A]]]
= dataSourceClassMap,
preparatorClassMap: Map[String, Class[_ <: BasePreparator[TD, PD]]]
= preparatorClassMap,
algorithmClassMap: Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]]
= algorithmClassMap,
servingClassMap: Map[String, Class[_ <: BaseServing[Q, P]]]
= servingClassMap): Engine[TD, EI, PD, Q, P, A] = {
new Engine(
dataSourceClassMap,
preparatorClassMap,
algorithmClassMap,
servingClassMap)
}
/** Training this engine would return a list of models.
*
* @param sc An instance of SparkContext.
* @param engineParams An instance of [[EngineParams]] for running a single training.
* @param params An instance of [[WorkflowParams]] that controls the workflow.
* @return A list of models.
*/
def train(
sc: SparkContext,
engineParams: EngineParams,
engineInstanceId: String,
params: WorkflowParams): Seq[Any] = {
val (dataSourceName, dataSourceParams) = engineParams.dataSourceParams
val dataSource = Doer(dataSourceClassMap(dataSourceName), dataSourceParams)
val (preparatorName, preparatorParams) = engineParams.preparatorParams
val preparator = Doer(preparatorClassMap(preparatorName), preparatorParams)
val algoParamsList = engineParams.algorithmParamsList
require(
algoParamsList.size > 0,
"EngineParams.algorithmParamsList must have at least 1 element.")
val algorithms = algoParamsList.map { case (algoName, algoParams) =>
Doer(algorithmClassMap(algoName), algoParams)
}
val models = Engine.train(
sc, dataSource, preparator, algorithms, params)
val algoCount = algorithms.size
val algoTuples: Seq[(String, Params, BaseAlgorithm[_, _, _, _], Any)] =
(0 until algoCount).map { ax => {
// val (name, params) = algoParamsList(ax)
val (name, params) = algoParamsList(ax)
(name, params, algorithms(ax), models(ax))
}}
makeSerializableModels(
sc,
engineInstanceId = engineInstanceId,
algoTuples = algoTuples)
}
/** Algorithm models can be persisted before deploy. However, it is also
* possible that models are not persisted. This method retrains non-persisted
* models and return a list of models that can be used directly in deploy.
*/
private[predictionio]
def prepareDeploy(
sc: SparkContext,
engineParams: EngineParams,
engineInstanceId: String,
persistedModels: Seq[Any],
params: WorkflowParams): Seq[Any] = {
val algoParamsList = engineParams.algorithmParamsList
val algorithms = algoParamsList.map { case (algoName, algoParams) =>
Doer(algorithmClassMap(algoName), algoParams)
}
val models = if (persistedModels.exists(m => m.isInstanceOf[Unit])) {
// If any of persistedModels is Unit, we need to re-train the model.
logger.info("Some persisted models are Unit, need to re-train.")
val (dataSourceName, dataSourceParams) = engineParams.dataSourceParams
val dataSource = Doer(dataSourceClassMap(dataSourceName), dataSourceParams)
val (preparatorName, preparatorParams) = engineParams.preparatorParams
val preparator = Doer(preparatorClassMap(preparatorName), preparatorParams)
val td = dataSource.readTrainingBase(sc)
val pd = preparator.prepareBase(sc, td)
val models = algorithms.zip(persistedModels).map { case (algo, m) =>
m match {
case () => algo.trainBase(sc, pd)
case _ => m
}
}
models
} else {
logger.info("Using persisted model")
persistedModels
}
models
.zip(algorithms)
.zip(algoParamsList)
.zipWithIndex
.map {
case (((model, algo), (algoName, algoParams)), ax) => {
model match {
case modelManifest: PersistentModelManifest => {
logger.info("Custom-persisted model detected for algorithm " +
algo.getClass.getName)
SparkWorkflowUtils.getPersistentModel(
modelManifest,
Seq(engineInstanceId, ax, algoName).mkString("-"),
algoParams,
Some(sc),
getClass.getClassLoader)
}
case m => {
try {
logger.info(
s"Loaded model ${m.getClass.getName} for algorithm " +
s"${algo.getClass.getName}")
sc.stop
} catch {
case e: NullPointerException =>
logger.warn(
s"Null model detected for algorithm ${algo.getClass.getName}")
}
m
}
} // model match
}
}
}
/** Extract model for persistent layer.
*
* PredictionIO persists models for future use. It allows custom
* implementation for persisting models. You need to implement the
* [[org.apache.predictionio.controller.PersistentModel]] interface. This method
* traverses all models in the workflow. If the model is a
* [[org.apache.predictionio.controller.PersistentModel]], it calls the save method
* for custom persistence logic.
*
* For model doesn't support custom logic, PredictionIO serializes the whole
* model if the corresponding algorithm is local. On the other hand, if the
* model is parallel (i.e. model associated with a number of huge RDDS), this
* method return Unit, in which case PredictionIO will retrain the whole
* model from scratch next time it is used.
*/
private def makeSerializableModels(
sc: SparkContext,
engineInstanceId: String,
// AlgoName, Algo, Model
algoTuples: Seq[(String, Params, BaseAlgorithm[_, _, _, _], Any)]
): Seq[Any] = {
logger.info(s"engineInstanceId=$engineInstanceId")
algoTuples
.zipWithIndex
.map { case ((name, params, algo, model), ax) =>
algo.makePersistentModel(
sc = sc,
modelId = Seq(engineInstanceId, ax, name).mkString("-"),
algoParams = params,
bm = model)
}
}
/** This is implemented such that [[org.apache.predictionio.controller.Evaluation]] can
* use this method to generate inputs for [[org.apache.predictionio.controller.Metric]].
*
* @param sc An instance of SparkContext.
* @param engineParams An instance of [[EngineParams]] for running a single evaluation.
* @param params An instance of [[WorkflowParams]] that controls the workflow.
* @return A list of evaluation information and RDD of query, predicted
* result, and actual result tuple tuple.
*/
def eval(
sc: SparkContext,
engineParams: EngineParams,
params: WorkflowParams)
: Seq[(EI, RDD[(Q, P, A)])] = {
val (dataSourceName, dataSourceParams) = engineParams.dataSourceParams
val dataSource = Doer(dataSourceClassMap(dataSourceName), dataSourceParams)
val (preparatorName, preparatorParams) = engineParams.preparatorParams
val preparator = Doer(preparatorClassMap(preparatorName), preparatorParams)
val algoParamsList = engineParams.algorithmParamsList
require(
algoParamsList.size > 0,
"EngineParams.algorithmParamsList must have at least 1 element.")
val algorithms = algoParamsList.map { case (algoName, algoParams) => {
try {
Doer(algorithmClassMap(algoName), algoParams)
} catch {
case e: NoSuchElementException => {
if (algoName == "") {
logger.error("Empty algorithm name supplied but it could not " +
"match with any algorithm in the engine's definition. " +
"Existing algorithm name(s) are: " +
s"${algorithmClassMap.keys.mkString(", ")}. Aborting.")
} else {
logger.error(s"$algoName cannot be found in the engine's " +
"definition. Existing algorithm name(s) are: " +
s"${algorithmClassMap.keys.mkString(", ")}. Aborting.")
}
sys.exit(1)
}
}
}}
val (servingName, servingParams) = engineParams.servingParams
val serving = Doer(servingClassMap(servingName), servingParams)
Engine.eval(sc, dataSource, preparator, algorithms, serving)
}
override def jValueToEngineParams(
variantJson: JValue,
jsonExtractor: JsonExtractorOption): EngineParams = {
val engineLanguage = EngineLanguage.Scala
// Extract EngineParams
logger.info(s"Extracting datasource params...")
val dataSourceParams: (String, Params) =
WorkflowUtils.getParamsFromJsonByFieldAndClass(
variantJson,
"datasource",
dataSourceClassMap,
engineLanguage,
jsonExtractor)
logger.info(s"Datasource params: $dataSourceParams")
logger.info(s"Extracting preparator params...")
val preparatorParams: (String, Params) =
WorkflowUtils.getParamsFromJsonByFieldAndClass(
variantJson,
"preparator",
preparatorClassMap,
engineLanguage,
jsonExtractor)
logger.info(s"Preparator params: $preparatorParams")
val algorithmsParams: Seq[(String, Params)] =
variantJson findField {
case JField("algorithms", _) => true
case _ => false
} map { jv =>
val algorithmsParamsJson = jv._2
algorithmsParamsJson match {
case JArray(s) => s.map { algorithmParamsJValue =>
val eap = algorithmParamsJValue.extract[CreateWorkflow.AlgorithmParams]
(
eap.name,
WorkflowUtils.extractParams(
engineLanguage,
compact(render(eap.params)),
algorithmClassMap(eap.name),
jsonExtractor)
)
}
case _ => Nil
}
} getOrElse Seq(("", EmptyParams()))
logger.info(s"Extracting serving params...")
val servingParams: (String, Params) =
WorkflowUtils.getParamsFromJsonByFieldAndClass(
variantJson,
"serving",
servingClassMap,
engineLanguage,
jsonExtractor)
logger.info(s"Serving params: $servingParams")
new EngineParams(
dataSourceParams = dataSourceParams,
preparatorParams = preparatorParams,
algorithmParamsList = algorithmsParams,
servingParams = servingParams)
}
private[predictionio] def engineInstanceToEngineParams(
engineInstance: EngineInstance,
jsonExtractor: JsonExtractorOption): EngineParams = {
implicit val formats = DefaultFormats
val engineLanguage = EngineLanguage.Scala
val dataSourceParamsWithName: (String, Params) = {
val (name, params) =
read[(String, JValue)](engineInstance.dataSourceParams)
if (!dataSourceClassMap.contains(name)) {
logger.error(s"Unable to find datasource class with name '$name'" +
" defined in Engine.")
sys.exit(1)
}
val extractedParams = WorkflowUtils.extractParams(
engineLanguage,
compact(render(params)),
dataSourceClassMap(name),
jsonExtractor)
(name, extractedParams)
}
val preparatorParamsWithName: (String, Params) = {
val (name, params) =
read[(String, JValue)](engineInstance.preparatorParams)
if (!preparatorClassMap.contains(name)) {
logger.error(s"Unable to find preparator class with name '$name'" +
" defined in Engine.")
sys.exit(1)
}
val extractedParams = WorkflowUtils.extractParams(
engineLanguage,
compact(render(params)),
preparatorClassMap(name),
jsonExtractor)
(name, extractedParams)
}
val algorithmsParamsWithNames =
read[Seq[(String, JValue)]](engineInstance.algorithmsParams).map {
case (algoName, params) =>
val extractedParams = WorkflowUtils.extractParams(
engineLanguage,
compact(render(params)),
algorithmClassMap(algoName),
jsonExtractor)
(algoName, extractedParams)
}
val servingParamsWithName: (String, Params) = {
val (name, params) = read[(String, JValue)](engineInstance.servingParams)
if (!servingClassMap.contains(name)) {
logger.error(s"Unable to find serving class with name '$name'" +
" defined in Engine.")
sys.exit(1)
}
val extractedParams = WorkflowUtils.extractParams(
engineLanguage,
compact(render(params)),
servingClassMap(name),
jsonExtractor)
(name, extractedParams)
}
new EngineParams(
dataSourceParams = dataSourceParamsWithName,
preparatorParams = preparatorParamsWithName,
algorithmParamsList = algorithmsParamsWithNames,
servingParams = servingParamsWithName)
}
}
/** This object contains concrete implementation for some methods of the
* [[Engine]] class.
*
* @group Engine
*/
object Engine {
private type EX = Int
private type AX = Int
private type QX = Long
@transient lazy private val logger = Logger[this.type]
/** Helper class to accept either a single data source, or a map of data
* sources, with a companion object providing implicit conversions, so
* using this class directly is not necessary.
*
* @tparam TD Training data class
* @tparam EI Evaluation information class
* @tparam Q Input query class
* @tparam A Actual result class
*/
class DataSourceMap[TD, EI, Q, A](
val m: Map[String, Class[_ <: BaseDataSource[TD, EI, Q, A]]]) {
def this(c: Class[_ <: BaseDataSource[TD, EI, Q, A]]) = this(Map("" -> c))
}
/** Companion object providing implicit conversions, so using this directly
* is not necessary.
*/
object DataSourceMap {
implicit def cToMap[TD, EI, Q, A](
c: Class[_ <: BaseDataSource[TD, EI, Q, A]]):
DataSourceMap[TD, EI, Q, A] = new DataSourceMap(c)
implicit def mToMap[TD, EI, Q, A](
m: Map[String, Class[_ <: BaseDataSource[TD, EI, Q, A]]]):
DataSourceMap[TD, EI, Q, A] = new DataSourceMap(m)
}
/** Helper class to accept either a single preparator, or a map of
* preparators, with a companion object providing implicit conversions, so
* using this class directly is not necessary.
*
* @tparam TD Training data class
* @tparam PD Prepared data class
*/
class PreparatorMap[TD, PD](
val m: Map[String, Class[_ <: BasePreparator[TD, PD]]]) {
def this(c: Class[_ <: BasePreparator[TD, PD]]) = this(Map("" -> c))
}
/** Companion object providing implicit conversions, so using this directly
* is not necessary.
*/
object PreparatorMap {
implicit def cToMap[TD, PD](
c: Class[_ <: BasePreparator[TD, PD]]):
PreparatorMap[TD, PD] = new PreparatorMap(c)
implicit def mToMap[TD, PD](
m: Map[String, Class[_ <: BasePreparator[TD, PD]]]):
PreparatorMap[TD, PD] = new PreparatorMap(m)
}
/** Helper class to accept either a single serving, or a map of serving, with
* a companion object providing implicit conversions, so using this class
* directly is not necessary.
*
* @tparam Q Input query class
* @tparam P Predicted result class
*/
class ServingMap[Q, P](
val m: Map[String, Class[_ <: BaseServing[Q, P]]]) {
def this(c: Class[_ <: BaseServing[Q, P]]) = this(Map("" -> c))
}
/** Companion object providing implicit conversions, so using this directly
* is not necessary.
*/
object ServingMap {
implicit def cToMap[Q, P](
c: Class[_ <: BaseServing[Q, P]]): ServingMap[Q, P] =
new ServingMap(c)
implicit def mToMap[Q, P](
m: Map[String, Class[_ <: BaseServing[Q, P]]]): ServingMap[Q, P] =
new ServingMap(m)
}
/** Convenient method for returning an instance of [[Engine]].
*
* @param dataSourceMap Accepts either an instance of Class of the data
* source, or a Map of data source classes (implicitly
* converted to [[DataSourceMap]].
* @param preparatorMap Accepts either an instance of Class of the
* preparator, or a Map of preparator classes
* (implicitly converted to [[PreparatorMap]].
* @param algorithmClassMap Accepts a Map of algorithm classes.
* @param servingMap Accepts either an instance of Class of the serving, or
* a Map of serving classes (implicitly converted to
* [[ServingMap]].
* @tparam TD Training data class
* @tparam EI Evaluation information class
* @tparam PD Prepared data class
* @tparam Q Input query class
* @tparam P Predicted result class
* @tparam A Actual result class
* @return An instance of [[Engine]]
*/
def apply[TD, EI, PD, Q, P, A](
dataSourceMap: DataSourceMap[TD, EI, Q, A],
preparatorMap: PreparatorMap[TD, PD],
algorithmClassMap: Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]],
servingMap: ServingMap[Q, P]): Engine[TD, EI, PD, Q, P, A] = new Engine(
dataSourceMap.m,
preparatorMap.m,
algorithmClassMap,
servingMap.m
)
/** Provides concrete implementation of training for [[Engine]].
*
* @param sc An instance of SparkContext
* @param dataSource An instance of data source
* @param preparator An instance of preparator
* @param algorithmList A list of algorithm instances
* @param params An instance of [[WorkflowParams]] that controls the training
* process.
* @tparam TD Training data class
* @tparam PD Prepared data class
* @tparam Q Input query class
* @return A list of trained models
*/
def train[TD, PD, Q](
sc: SparkContext,
dataSource: BaseDataSource[TD, _, Q, _],
preparator: BasePreparator[TD, PD],
algorithmList: Seq[BaseAlgorithm[PD, _, Q, _]],
params: WorkflowParams
): Seq[Any] = {
logger.info("EngineWorkflow.train")
logger.info(s"DataSource: $dataSource")
logger.info(s"Preparator: $preparator")
logger.info(s"AlgorithmList: $algorithmList")
if (params.skipSanityCheck) {
logger.info("Data sanity check is off.")
} else {
logger.info("Data sanity check is on.")
}
val td = try {
dataSource.readTrainingBase(sc)
} catch {
case e: StorageClientException =>
logger.error(s"Error occurred reading from data source. (Reason: " +
e.getMessage + ") Please see the log for debugging details.", e)
sys.exit(1)
}
if (!params.skipSanityCheck) {
td match {
case sanityCheckable: SanityCheck => {
logger.info(s"${td.getClass.getName} supports data sanity" +
" check. Performing check.")
sanityCheckable.sanityCheck()
}
case _ => {
logger.info(s"${td.getClass.getName} does not support" +
" data sanity check. Skipping check.")
}
}
}
if (params.stopAfterRead) {
logger.info("Stopping here because --stop-after-read is set.")
throw StopAfterReadInterruption()
}
val pd = preparator.prepareBase(sc, td)
if (!params.skipSanityCheck) {
pd match {
case sanityCheckable: SanityCheck => {
logger.info(s"${pd.getClass.getName} supports data sanity" +
" check. Performing check.")
sanityCheckable.sanityCheck()
}
case _ => {
logger.info(s"${pd.getClass.getName} does not support" +
" data sanity check. Skipping check.")
}
}
}
if (params.stopAfterPrepare) {
logger.info("Stopping here because --stop-after-prepare is set.")
throw StopAfterPrepareInterruption()
}
val models: Seq[Any] = algorithmList.map(_.trainBase(sc, pd))
if (!params.skipSanityCheck) {
models.foreach { model =>
model match {
case sanityCheckable: SanityCheck => {
logger.info(s"${model.getClass.getName} supports data sanity" +
" check. Performing check.")
sanityCheckable.sanityCheck()
}
case _ => {
logger.info(s"${model.getClass.getName} does not support" +
" data sanity check. Skipping check.")
}
}
}
}
logger.info("EngineWorkflow.train completed")
models
}
/** Provides concrete implementation of evaluation for [[Engine]].
*
* @param sc An instance of SparkContext
* @param dataSource An instance of data source
* @param preparator An instance of preparator
* @param algorithmList A list of algorithm instances
* @param serving An instance of serving
* @tparam TD Training data class
* @tparam PD Prepared data class
* @tparam Q Input query class
* @tparam P Predicted result class
* @tparam A Actual result class
* @tparam EI Evaluation information class
* @return A list of evaluation information, RDD of query, predicted result,
* and actual result tuple tuple.
*/
def eval[TD, PD, Q, P, A, EI](
sc: SparkContext,
dataSource: BaseDataSource[TD, EI, Q, A],
preparator: BasePreparator[TD, PD],
algorithmList: Seq[BaseAlgorithm[PD, _, Q, P]],
serving: BaseServing[Q, P]): Seq[(EI, RDD[(Q, P, A)])] = {
logger.info(s"DataSource: $dataSource")
logger.info(s"Preparator: $preparator")
logger.info(s"AlgorithmList: $algorithmList")
logger.info(s"Serving: $serving")
val algoMap: Map[AX, BaseAlgorithm[PD, _, Q, P]] = algorithmList
.zipWithIndex
.map(_.swap)
.toMap
val algoCount = algoMap.size
val evalTupleMap: Map[EX, (TD, EI, RDD[(Q, A)])] = dataSource
.readEvalBase(sc)
.zipWithIndex
.map(_.swap)
.toMap
val evalCount = evalTupleMap.size
val evalTrainMap: Map[EX, TD] = evalTupleMap.mapValues(_._1)
val evalInfoMap: Map[EX, EI] = evalTupleMap.mapValues(_._2)
val evalQAsMap: Map[EX, RDD[(QX, (Q, A))]] = evalTupleMap
.mapValues(_._3)
.mapValues{ _.zipWithUniqueId().map(_.swap) }
val preparedMap: Map[EX, PD] = evalTrainMap.mapValues { td =>
preparator.prepareBase(sc, td)
}
val algoModelsMap: Map[EX, Map[AX, Any]] = preparedMap.mapValues { pd =>
algoMap.mapValues(_.trainBase(sc,pd))
}
val suppQAsMap: Map[EX, RDD[(QX, (Q, A))]] = evalQAsMap.mapValues { qas =>
qas.map { case (qx, (q, a)) => (qx, (serving.supplementBase(q), a)) }
}
val algoPredictsMap: Map[EX, RDD[(QX, Seq[P])]] = (0 until evalCount)
.map { ex =>
val modelMap: Map[AX, Any] = algoModelsMap(ex)
val qs: RDD[(QX, Q)] = suppQAsMap(ex).mapValues(_._1)
val algoPredicts: Seq[RDD[(QX, (AX, P))]] = (0 until algoCount)
.map { ax =>
val algo = algoMap(ax)
val model = modelMap(ax)
val rawPredicts: RDD[(QX, P)] = algo.batchPredictBase(sc, model, qs)
val predicts: RDD[(QX, (AX, P))] = rawPredicts.map { case (qx, p) =>
(qx, (ax, p))
}
predicts
}
val unionAlgoPredicts: RDD[(QX, Seq[P])] = sc.union(algoPredicts)
.groupByKey()
.mapValues { ps =>
assert (ps.size == algoCount, "Must have same length as algoCount")
// TODO. Check size == algoCount
ps.toSeq.sortBy(_._1).map(_._2)
}
(ex, unionAlgoPredicts)
}
.toMap
val servingQPAMap: Map[EX, RDD[(Q, P, A)]] = algoPredictsMap
.map { case (ex, psMap) =>
// The query passed to serving.serve is the original one, not
// supplemented.
val qasMap: RDD[(QX, (Q, A))] = evalQAsMap(ex)
val qpsaMap: RDD[(QX, Q, Seq[P], A)] = psMap.join(qasMap)
.map { case (qx, t) => (qx, t._2._1, t._1, t._2._2) }
val qpaMap: RDD[(Q, P, A)] = qpsaMap.map {
case (qx, q, ps, a) => (q, serving.serveBase(q, ps), a)
}
(ex, qpaMap)
}
(0 until evalCount).map { ex =>
(evalInfoMap(ex), servingQPAMap(ex))
}
}
}
/** Mix in this trait for queries that contain prId (PredictedResultId).
* This is useful when your engine expects queries to also be associated with
* prId keys when feedback loop is enabled.
*
* @group Helper
*/
@deprecated("To be removed in future releases.", "0.9.2")
trait WithPrId {
val prId: String = ""
}