blob: 49283936c5d765917b8d46480f31ca6d30055bca [file] [log] [blame]
/** Copyright 2015 TappingStone, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prediction.controller
import io.prediction.core.BaseDataSource
import io.prediction.core.BasePreparator
import io.prediction.core.BaseAlgorithm
import io.prediction.core.BaseServing
import io.prediction.core.Doer
import io.prediction.core.BaseEngine
import io.prediction.workflow.CreateWorkflow
import io.prediction.workflow.WorkflowUtils
import io.prediction.workflow.EngineLanguage
import io.prediction.workflow.PersistentModelManifest
import io.prediction.workflow.SparkWorkflowUtils
import io.prediction.workflow.StopAfterReadInterruption
import io.prediction.workflow.StopAfterPrepareInterruption
import io.prediction.data.storage.EngineInstance
import io.prediction.annotation.Experimental
import _root_.java.util.NoSuchElementException
import io.prediction.data.storage.StorageClientException
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import scala.language.implicitConversions
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.read
import io.prediction.workflow.NameParamsSerializer
import grizzled.slf4j.Logger
import scala.collection.mutable.{ HashMap => MutableHashMap }
@Experimental
object FastEvalEngineWorkflow {
@transient lazy val logger = Logger[this.type]
type EX = Int
type AX = Int
type QX = Long
case class DataSourcePrefix(dataSourceParams: (String, Params)) {
def this(pp: PreparatorPrefix) = this(pp.dataSourceParams)
def this(ap: AlgorithmsPrefix) = this(ap.dataSourceParams)
def this(sp: ServingPrefix) = this(sp.dataSourceParams)
}
case class PreparatorPrefix(
val dataSourceParams: (String, Params),
val preparatorParams: (String, Params)) {
def this(ap: AlgorithmsPrefix) = {
this(ap.dataSourceParams, ap.preparatorParams)
}
}
case class AlgorithmsPrefix(
val dataSourceParams: (String, Params),
val preparatorParams: (String, Params),
val algorithmParamsList: Seq[(String, Params)]) {
def this(sp: ServingPrefix) = {
this(sp.dataSourceParams, sp.preparatorParams, sp.algorithmParamsList)
}
}
case class ServingPrefix(
val dataSourceParams: (String, Params),
val preparatorParams: (String, Params),
val algorithmParamsList: Seq[(String, Params)],
val servingParams: (String, Params)) {
def this(ep: EngineParams) = this(
ep.dataSourceParams,
ep.preparatorParams,
ep.algorithmParamsList,
ep.servingParams)
}
def getDataSourceResult[TD, EI, PD, Q, P, A](
workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
prefix: DataSourcePrefix)
: Map[EX, (TD, EI, RDD[(QX, (Q, A))])] = {
val cache = workflow.dataSourceCache
if (!cache.contains(prefix)) {
val dataSource = Doer(
workflow.engine.dataSourceClassMap(prefix.dataSourceParams._1),
prefix.dataSourceParams._2)
val result = dataSource
.readEvalBase(workflow.sc)
.map { case (td, ei, qaRDD) => {
(td, ei, qaRDD.zipWithUniqueId().map(_.swap))
}}
.zipWithIndex
.map(_.swap)
.toMap
cache += Tuple2(prefix, result)
}
cache(prefix)
}
def getPreparatorResult[TD, EI, PD, Q, P, A](
workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
prefix: PreparatorPrefix): Map[EX, PD] = {
val cache = workflow.preparatorCache
if (!cache.contains(prefix)) {
val preparator = Doer(
workflow.engine.preparatorClassMap(prefix.preparatorParams._1),
prefix.preparatorParams._2)
val result = getDataSourceResult(
workflow = workflow,
prefix = new DataSourcePrefix(prefix))
.mapValues { case (td, _, _) => preparator.prepareBase(workflow.sc, td) }
cache += Tuple2(prefix, result)
}
cache(prefix)
}
def computeAlgorithmsResult[TD, EI, PD, Q, P, A](
workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
prefix: AlgorithmsPrefix): Map[EX, RDD[(QX, Seq[P])]] = {
val algoMap: Map[AX, BaseAlgorithm[PD, _, Q, P]] = prefix.algorithmParamsList
.map { case (algoName, algoParams) => {
try {
Doer(workflow.engine.algorithmClassMap(algoName), algoParams)
} catch {
case e: NoSuchElementException => {
val algorithmClassMap = workflow.engine.algorithmClassMap
if (algoName == "") {
logger.error("Empty algorithm name supplied but it could not " +
"match with any algorithm in the engine's definition. " +
"Existing algorithm name(s) are: " +
s"${algorithmClassMap.keys.mkString(", ")}. Aborting.")
} else {
logger.error(s"${algoName} cannot be found in the engine's " +
"definition. Existing algorithm name(s) are: " +
s"${algorithmClassMap.keys.mkString(", ")}. Aborting.")
}
sys.exit(1)
}
}
}}
.zipWithIndex
.map(_.swap)
.toMap
val algoCount = algoMap.size
// Model Train
val algoModelsMap: Map[EX, Map[AX, Any]] = getPreparatorResult(
workflow,
new PreparatorPrefix(prefix))
.mapValues {
pd => algoMap.mapValues(_.trainBase(workflow.sc,pd))
}
// Predict
val dataSourceResult =
FastEvalEngineWorkflow.getDataSourceResult(
workflow = workflow,
prefix = new DataSourcePrefix(prefix))
val algoResult: Map[EX, RDD[(QX, Seq[P])]] = dataSourceResult
.par
.map { case (ex, (td, ei, iqaRDD)) => {
val modelsMap: Map[AX, Any] = algoModelsMap(ex)
val qs: RDD[(QX, Q)] = iqaRDD.mapValues(_._1)
val algoPredicts: Seq[RDD[(QX, (AX, P))]] = (0 until algoCount)
.map { ax => {
val algo = algoMap(ax)
val model = modelsMap(ax)
val rawPredicts: RDD[(QX, P)] = algo.batchPredictBase(
workflow.sc,
model,
qs)
val predicts: RDD[(QX, (AX, P))] = rawPredicts.map {
case (qx, p) => (qx, (ax, p))
}
predicts
}}
val unionAlgoPredicts: RDD[(QX, Seq[P])] = workflow.sc
.union(algoPredicts)
.groupByKey
.mapValues { ps => {
assert (ps.size == algoCount, "Must have same length as algoCount")
// TODO. Check size == algoCount
ps.toSeq.sortBy(_._1).map(_._2)
}}
(ex, unionAlgoPredicts)
}}
.seq
.toMap
algoResult
}
def getAlgorithmsResult[TD, EI, PD, Q, P, A](
workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
prefix: AlgorithmsPrefix): Map[EX, RDD[(QX, Seq[P])]] = {
val cache = workflow.algorithmsCache
if (!cache.contains(prefix)) {
val result = computeAlgorithmsResult(workflow, prefix)
cache += Tuple2(prefix, result)
}
cache(prefix)
}
def getServingResult[TD, EI, PD, Q, P, A](
workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
prefix: ServingPrefix)
: Seq[(EI, RDD[(Q, P, A)])] = {
val cache = workflow.servingCache
if (!cache.contains(prefix)) {
val serving = Doer(
workflow.engine.servingClassMap(prefix.servingParams._1),
prefix.servingParams._2)
val algoPredictsMap = getAlgorithmsResult(
workflow = workflow,
prefix = new AlgorithmsPrefix(prefix))
val dataSourceResult = getDataSourceResult(
workflow = workflow,
prefix = new DataSourcePrefix(prefix))
val evalQAsMap = dataSourceResult.mapValues(_._3)
val evalInfoMap = dataSourceResult.mapValues(_._2)
val servingQPAMap: Map[EX, RDD[(Q, P, A)]] = algoPredictsMap
.map { case (ex, psMap) => {
val qasMap: RDD[(QX, (Q, A))] = evalQAsMap(ex)
val qpsaMap: RDD[(QX, Q, Seq[P], A)] = psMap.join(qasMap)
.map { case (qx, t) => (qx, t._2._1, t._1, t._2._2) }
val qpaMap: RDD[(Q, P, A)] = qpsaMap.map {
case (qx, q, ps, a) => (q, serving.serveBase(q, ps), a)
}
(ex, qpaMap)
}}
val servingResult = (0 until evalQAsMap.size).map { ex => {
(evalInfoMap(ex), servingQPAMap(ex))
}}
.toSeq
cache += Tuple2(prefix, servingResult)
}
cache(prefix)
}
def get[TD, EI, PD, Q, P, A](
workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
engineParamsList: Seq[EngineParams])
: Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])] = {
engineParamsList.map { engineParams => {
(engineParams,
getServingResult(workflow, new ServingPrefix(engineParams)))
}}
}
}
@Experimental
class FastEvalEngineWorkflow[TD, EI, PD, Q, P, A](
val engine: FastEvalEngine[TD, EI, PD, Q, P, A],
val sc: SparkContext,
val workflowParams: WorkflowParams) extends Serializable {
import io.prediction.controller.FastEvalEngineWorkflow._
type DataSourceResult = Map[EX, (TD, EI, RDD[(QX, (Q, A))])]
type PreparatorResult = Map[EX, PD]
type AlgorithmsResult = Map[EX, RDD[(QX, Seq[P])]]
type ServingResult = Seq[(EI, RDD[(Q, P, A)])]
val dataSourceCache = MutableHashMap[DataSourcePrefix, DataSourceResult]()
val preparatorCache = MutableHashMap[PreparatorPrefix, PreparatorResult]()
val algorithmsCache = MutableHashMap[AlgorithmsPrefix, AlgorithmsResult]()
val servingCache = MutableHashMap[ServingPrefix, ServingResult]()
}
/** FastEvalEngine is a subclass of Engine that exploits the immutability of
* controllers to optimize the evaluation process.
*/
@Experimental
class FastEvalEngine[TD, EI, PD, Q, P, A](
dataSourceClassMap: Map[String, Class[_ <: BaseDataSource[TD, EI, Q, A]]],
preparatorClassMap: Map[String, Class[_ <: BasePreparator[TD, PD]]],
algorithmClassMap: Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]],
servingClassMap: Map[String, Class[_ <: BaseServing[Q, P]]])
extends Engine[TD, EI, PD, Q, P, A](
dataSourceClassMap,
preparatorClassMap,
algorithmClassMap,
servingClassMap) {
@transient override lazy val logger = Logger[this.type]
override def eval(
sc: SparkContext,
engineParams: EngineParams,
params: WorkflowParams): Seq[(EI, RDD[(Q, P, A)])] = {
logger.info("FastEvalEngine.eval")
batchEval(sc, Seq(engineParams), params).head._2
}
override def batchEval(
sc: SparkContext,
engineParamsList: Seq[EngineParams],
params: WorkflowParams)
: Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])] = {
val fastEngineWorkflow = new FastEvalEngineWorkflow(
this, sc, params)
FastEvalEngineWorkflow.get(
fastEngineWorkflow,
engineParamsList)
}
}