| package io.prediction.controller |
| |
| import io.prediction.workflow.PersistentModelManifest |
| import io.prediction.workflow.SharedSparkContext |
| import io.prediction.workflow.StopAfterPrepareInterruption |
| import io.prediction.workflow.StopAfterReadInterruption |
| |
| import grizzled.slf4j.Logger |
| import io.prediction.workflow.WorkflowParams |
| import org.apache.spark.rdd.RDD |
| import org.scalatest.Inspectors._ |
| import org.scalatest.Matchers._ |
| import org.scalatest.FunSuite |
| import org.scalatest.Inside |
| |
| import scala.util.Random |
| |
| class EngineSuite |
| extends FunSuite with Inside with SharedSparkContext { |
| import io.prediction.controller.Engine0._ |
| @transient lazy val logger = Logger[this.type] |
| |
| test("Engine.train") { |
| val engine = new Engine( |
| classOf[PDataSource2], |
| classOf[PPreparator1], |
| Map("" -> classOf[PAlgo2]), |
| classOf[LServing1]) |
| |
| val engineParams = EngineParams( |
| dataSourceParams = PDataSource2.Params(0), |
| preparatorParams = PPreparator1.Params(1), |
| algorithmParamsList = Seq(("", PAlgo2.Params(2))), |
| servingParams = LServing1.Params(3)) |
| |
| val models = engine.train( |
| sc, |
| engineParams, |
| engineInstanceId = "", |
| params = WorkflowParams()) |
| |
| val pd = ProcessedData(1, TrainingData(0)) |
| |
| // PAlgo2.Model doesn't have IPersistentModel trait implemented. Hence the |
| // model extract after train is Unit. |
| models should contain theSameElementsAs Seq(Unit) |
| } |
| |
| test("Engine.train persisting PAlgo.Model") { |
| val engine = new Engine( |
| classOf[PDataSource2], |
| classOf[PPreparator1], |
| Map( |
| "PAlgo2" -> classOf[PAlgo2], |
| "PAlgo3" -> classOf[PAlgo3] |
| ), |
| classOf[LServing1]) |
| |
| val engineParams = EngineParams( |
| dataSourceParams = PDataSource2.Params(0), |
| preparatorParams = PPreparator1.Params(1), |
| algorithmParamsList = Seq( |
| ("PAlgo2", PAlgo2.Params(2)), |
| ("PAlgo3", PAlgo3.Params(21)), |
| ("PAlgo3", PAlgo3.Params(22)) |
| ), |
| servingParams = LServing1.Params(3)) |
| |
| val pd = ProcessedData(1, TrainingData(0)) |
| val model21 = PAlgo3.Model(21, pd) |
| val model22 = PAlgo3.Model(22, pd) |
| |
| val models = engine.train( |
| sc, |
| engineParams, |
| engineInstanceId = "", |
| params = WorkflowParams()) |
| |
| val pModel21 = PersistentModelManifest(model21.getClass.getName) |
| val pModel22 = PersistentModelManifest(model22.getClass.getName) |
| |
| models should contain theSameElementsAs Seq(Unit, pModel21, pModel22) |
| } |
| |
| test("Engine.train persisting LAlgo.Model") { |
| val engine = Engine( |
| classOf[LDataSource1], |
| classOf[LPreparator1], |
| Map( |
| "LAlgo1" -> classOf[LAlgo1], |
| "LAlgo2" -> classOf[LAlgo2], |
| "LAlgo3" -> classOf[LAlgo3] |
| ), |
| classOf[LServing1]) |
| |
| val engineParams = EngineParams( |
| dataSourceParams = LDataSource1.Params(0), |
| preparatorParams = LPreparator1.Params(1), |
| algorithmParamsList = Seq( |
| ("LAlgo2", LAlgo2.Params(20)), |
| ("LAlgo2", LAlgo2.Params(21)), |
| ("LAlgo3", LAlgo3.Params(22))), |
| servingParams = LServing1.Params(3)) |
| |
| val pd = ProcessedData(1, TrainingData(0)) |
| val model20 = LAlgo2.Model(20, pd) |
| val model21 = LAlgo2.Model(21, pd) |
| val model22 = LAlgo3.Model(22, pd) |
| |
| //val models = engine.train(sc, engineParams, WorkflowParams()) |
| val models = engine.train( |
| sc, |
| engineParams, |
| engineInstanceId = "", |
| params = WorkflowParams()) |
| |
| val pModel20 = PersistentModelManifest(model20.getClass.getName) |
| val pModel21 = PersistentModelManifest(model21.getClass.getName) |
| |
| models should contain theSameElementsAs Seq(pModel20, pModel21, model22) |
| } |
| |
| test("Engine.train persisting P&NAlgo.Model") { |
| val engine = new Engine( |
| classOf[PDataSource2], |
| classOf[PPreparator1], |
| Map( |
| "PAlgo2" -> classOf[PAlgo2], |
| "PAlgo3" -> classOf[PAlgo3], |
| "NAlgo2" -> classOf[NAlgo2], |
| "NAlgo3" -> classOf[NAlgo3] |
| ), |
| classOf[LServing1]) |
| |
| val engineParams = EngineParams( |
| dataSourceParams = PDataSource2.Params(0), |
| preparatorParams = PPreparator1.Params(1), |
| algorithmParamsList = Seq( |
| ("PAlgo2", PAlgo2.Params(20)), |
| ("PAlgo3", PAlgo3.Params(21)), |
| ("PAlgo3", PAlgo3.Params(22)), |
| ("NAlgo2", NAlgo2.Params(23)), |
| ("NAlgo3", NAlgo3.Params(24)), |
| ("NAlgo3", NAlgo3.Params(25)) |
| ), |
| servingParams = LServing1.Params(3)) |
| |
| val pd = ProcessedData(1, TrainingData(0)) |
| val model21 = PAlgo3.Model(21, pd) |
| val model22 = PAlgo3.Model(22, pd) |
| val model23 = NAlgo2.Model(23, pd) |
| val model24 = NAlgo3.Model(24, pd) |
| val model25 = NAlgo3.Model(25, pd) |
| |
| //val models = engine.train(sc, engineParams, WorkflowParams()) |
| val models = engine.train( |
| sc, |
| engineParams, |
| engineInstanceId = "", |
| params = WorkflowParams()) |
| |
| val pModel21 = PersistentModelManifest(model21.getClass.getName) |
| val pModel22 = PersistentModelManifest(model22.getClass.getName) |
| val pModel23 = PersistentModelManifest(model23.getClass.getName) |
| |
| models should contain theSameElementsAs Seq( |
| Unit, pModel21, pModel22, pModel23, model24, model25) |
| } |
| |
| test("Engine.prepareDeploy PAlgo") { |
| val engine = new Engine( |
| classOf[PDataSource2], |
| classOf[PPreparator1], |
| Map( |
| "PAlgo2" -> classOf[PAlgo2], |
| "PAlgo3" -> classOf[PAlgo3], |
| "NAlgo2" -> classOf[NAlgo2], |
| "NAlgo3" -> classOf[NAlgo3] |
| ), |
| classOf[LServing1]) |
| |
| val engineParams = EngineParams( |
| dataSourceParams = PDataSource2.Params(0), |
| preparatorParams = PPreparator1.Params(1), |
| algorithmParamsList = Seq( |
| ("PAlgo2", PAlgo2.Params(20)), |
| ("PAlgo3", PAlgo3.Params(21)), |
| ("PAlgo3", PAlgo3.Params(22)), |
| ("NAlgo2", NAlgo2.Params(23)), |
| ("NAlgo3", NAlgo3.Params(24)), |
| ("NAlgo3", NAlgo3.Params(25)) |
| ), |
| servingParams = LServing1.Params(3)) |
| |
| val pd = ProcessedData(1, TrainingData(0)) |
| val model20 = PAlgo2.Model(20, pd) |
| val model21 = PAlgo3.Model(21, pd) |
| val model22 = PAlgo3.Model(22, pd) |
| val model23 = NAlgo2.Model(23, pd) |
| val model24 = NAlgo3.Model(24, pd) |
| val model25 = NAlgo3.Model(25, pd) |
| |
| val rand = new Random() |
| |
| val fakeEngineInstanceId = s"FakeInstanceId-${rand.nextLong()}" |
| |
| val persistedModels = engine.train( |
| sc, |
| engineParams, |
| engineInstanceId = fakeEngineInstanceId, |
| params = WorkflowParams() |
| ) |
| |
| val deployableModels = engine.prepareDeploy( |
| sc, |
| engineParams, |
| fakeEngineInstanceId, |
| persistedModels, |
| params = WorkflowParams() |
| ) |
| |
| deployableModels should contain theSameElementsAs Seq( |
| model20, model21, model22, model23, model24, model25) |
| } |
| |
| test("Engine.eval") { |
| val engine = new Engine( |
| classOf[PDataSource2], |
| classOf[PPreparator1], |
| Map("" -> classOf[PAlgo2]), |
| classOf[LServing1]) |
| |
| val qn = 10 |
| val en = 3 |
| |
| val engineParams = EngineParams( |
| dataSourceParams = PDataSource2.Params(id = 0, en = en, qn = qn), |
| preparatorParams = PPreparator1.Params(1), |
| algorithmParamsList = Seq(("", PAlgo2.Params(2))), |
| servingParams = LServing1.Params(3)) |
| |
| val algoCount = engineParams.algorithmParamsList.size |
| val pd = ProcessedData(1, TrainingData(0)) |
| val model0 = PAlgo2.Model(2, pd) |
| |
| val evalDataSet = engine.eval(sc, engineParams, WorkflowParams()) |
| |
| evalDataSet should have size en |
| |
| forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => { |
| val (evalInfo, qpaRDD) = evalData |
| evalInfo shouldBe EvalInfo(0) |
| |
| val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect |
| |
| qpaSeq should have size qn |
| |
| forAll (qpaSeq) { case (q, p, a) => |
| val Query(qId, qEx, qQx) = q |
| val Actual(aId, aEx, aQx) = a |
| qId shouldBe aId |
| qEx shouldBe ex |
| aEx shouldBe ex |
| qQx shouldBe aQx |
| |
| inside (p) { case Prediction(pId, pQ, pModels, pPs) => { |
| pId shouldBe 3 |
| pQ shouldBe q |
| pModels shouldBe None |
| pPs should have size algoCount |
| pPs shouldBe Seq( |
| Prediction(id = 2, q = q, models = Some(model0))) |
| }} |
| } |
| }} |
| } |
| } |
| |
| class EngineTrainSuite extends FunSuite with SharedSparkContext { |
| import io.prediction.controller.Engine0._ |
| val defaultWorkflowParams: WorkflowParams = WorkflowParams() |
| |
| test("Parallel DS/P/Algos") { |
| val models = Engine.train( |
| sc, |
| new PDataSource0(0), |
| new PPreparator0(1), |
| Seq( |
| new PAlgo0(2), |
| new PAlgo1(3), |
| new PAlgo0(4)), |
| defaultWorkflowParams |
| ) |
| |
| val pd = ProcessedData(1, TrainingData(0)) |
| |
| models should contain theSameElementsAs Seq( |
| PAlgo0.Model(2, pd), PAlgo1.Model(3, pd), PAlgo0.Model(4, pd)) |
| } |
| |
| test("Local DS/P/Algos") { |
| val models = Engine.train( |
| sc, |
| new LDataSource0(0), |
| new LPreparator0(1), |
| Seq( |
| new LAlgo0(2), |
| new LAlgo1(3), |
| new LAlgo0(4)), |
| defaultWorkflowParams |
| ) |
| |
| val pd = ProcessedData(1, TrainingData(0)) |
| |
| val expectedResults = Seq( |
| LAlgo0.Model(2, pd), |
| LAlgo1.Model(3, pd), |
| LAlgo0.Model(4, pd)) |
| |
| forAll(models.zip(expectedResults)) { case (model, expected) => |
| model shouldBe a [RDD[_]] |
| val localModel = model.asInstanceOf[RDD[_]].collect |
| localModel should contain theSameElementsAs Seq(expected) |
| } |
| } |
| |
| test("P2L DS/P/Algos") { |
| val models = Engine.train( |
| sc, |
| new PDataSource0(0), |
| new PPreparator0(1), |
| Seq( |
| new NAlgo0(2), |
| new NAlgo1(3), |
| new NAlgo0(4)), |
| defaultWorkflowParams |
| ) |
| |
| val pd = ProcessedData(1, TrainingData(0)) |
| |
| models should contain theSameElementsAs Seq( |
| NAlgo0.Model(2, pd), NAlgo1.Model(3, pd), NAlgo0.Model(4, pd)) |
| } |
| |
| test("Parallel DS/P/Algos Stop-After-Read") { |
| val workflowParams = defaultWorkflowParams.copy( |
| stopAfterRead = true) |
| |
| an [StopAfterReadInterruption] should be thrownBy Engine.train( |
| sc, |
| new PDataSource0(0), |
| new PPreparator0(1), |
| Seq( |
| new PAlgo0(2), |
| new PAlgo1(3), |
| new PAlgo0(4)), |
| workflowParams |
| ) |
| } |
| |
| test("Parallel DS/P/Algos Stop-After-Prepare") { |
| val workflowParams = defaultWorkflowParams.copy( |
| stopAfterPrepare = true) |
| |
| an [StopAfterPrepareInterruption] should be thrownBy Engine.train( |
| sc, |
| new PDataSource0(0), |
| new PPreparator0(1), |
| Seq( |
| new PAlgo0(2), |
| new PAlgo1(3), |
| new PAlgo0(4)), |
| workflowParams |
| ) |
| } |
| |
| test("Parallel DS/P/Algos Dirty TrainingData") { |
| val workflowParams = defaultWorkflowParams.copy( |
| skipSanityCheck = false) |
| |
| an [AssertionError] should be thrownBy Engine.train( |
| sc, |
| new PDataSource3(0, error = true), |
| new PPreparator0(1), |
| Seq( |
| new PAlgo0(2), |
| new PAlgo1(3), |
| new PAlgo0(4)), |
| workflowParams |
| ) |
| } |
| |
| test("Parallel DS/P/Algos Dirty TrainingData But Skip Check") { |
| val workflowParams = defaultWorkflowParams.copy( |
| skipSanityCheck = true) |
| |
| val models = Engine.train( |
| sc, |
| new PDataSource3(0, error = true), |
| new PPreparator0(1), |
| Seq( |
| new PAlgo0(2), |
| new PAlgo1(3), |
| new PAlgo0(4)), |
| workflowParams |
| ) |
| |
| val pd = ProcessedData(1, TrainingData(0, error = true)) |
| |
| models should contain theSameElementsAs Seq( |
| PAlgo0.Model(2, pd), PAlgo1.Model(3, pd), PAlgo0.Model(4, pd)) |
| } |
| } |
| |
| |
| class EngineEvalSuite |
| extends FunSuite with Inside with SharedSparkContext { |
| import io.prediction.controller.Engine0._ |
| |
| @transient lazy val logger = Logger[this.type] |
| |
| test("Simple Parallel DS/P/A/S") { |
| val en = 2 |
| val qn = 5 |
| |
| val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = |
| Engine.eval( |
| sc, |
| new PDataSource1(id = 1, en = en, qn = qn), |
| new PPreparator0(id = 2), |
| Seq(new PAlgo0(id = 3)), |
| new LServing0(id = 10)) |
| |
| val pd = ProcessedData(2, TrainingData(1)) |
| val model0 = PAlgo0.Model(3, pd) |
| |
| forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => { |
| val (evalInfo, qpaRDD) = evalData |
| evalInfo shouldBe EvalInfo(1) |
| |
| val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect |
| forAll (qpaSeq) { case (q, p, a) => |
| val Query(qId, qEx, qQx) = q |
| val Actual(aId, aEx, aQx) = a |
| qId shouldBe aId |
| qEx shouldBe ex |
| aEx shouldBe ex |
| qQx shouldBe aQx |
| |
| inside (p) { case Prediction(pId, pQ, pModels, pPs) => { |
| pId shouldBe 10 |
| pQ shouldBe q |
| pModels shouldBe None |
| pPs should have size 1 |
| pPs shouldBe Seq( |
| Prediction(id = 3, q = q, models = Some(model0))) |
| }} |
| } |
| |
| }} |
| |
| } |
| |
| test("Parallel DS/P/A/S") { |
| val en = 2 |
| val qn = 5 |
| |
| val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = |
| Engine.eval( |
| sc, |
| new PDataSource1(id = 1, en = en, qn = qn), |
| new PPreparator0(id = 2), |
| Seq( |
| new PAlgo0(id = 3), |
| new PAlgo1(id = 4), |
| new NAlgo1(id = 5)), |
| new LServing0(id = 10)) |
| |
| val pd = ProcessedData(2, TrainingData(1)) |
| val model0 = PAlgo0.Model(3, pd) |
| val model1 = PAlgo1.Model(4, pd) |
| val model2 = NAlgo1.Model(5, pd) |
| |
| forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => { |
| val (evalInfo, qpaRDD) = evalData |
| evalInfo shouldBe EvalInfo(1) |
| |
| val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect |
| forAll (qpaSeq) { case (q, p, a) => |
| val Query(qId, qEx, qQx) = q |
| val Actual(aId, aEx, aQx) = a |
| qId shouldBe aId |
| qEx shouldBe ex |
| aEx shouldBe ex |
| qQx shouldBe aQx |
| |
| inside (p) { case Prediction(pId, pQ, pModels, pPs) => { |
| pId shouldBe 10 |
| pQ shouldBe q |
| pModels shouldBe None |
| pPs should have size 3 |
| pPs shouldBe Seq( |
| Prediction(id = 3, q = q, models = Some(model0)), |
| Prediction(id = 4, q = q, models = Some(model1)), |
| Prediction(id = 5, q = q, models = Some(model2)) |
| ) |
| }} |
| } |
| }} |
| } |
| |
| test("Local DS/P/A/S") { |
| val en = 2 |
| val qn = 5 |
| |
| val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = |
| Engine.eval( |
| sc, |
| new LDataSource0(id = 1, en = en, qn = qn), |
| new LPreparator0(id = 2), |
| Seq( |
| new LAlgo0(id = 3), |
| new LAlgo1(id = 4), |
| new LAlgo1(id = 5)), |
| new LServing0(id = 10)) |
| |
| val pd = ProcessedData(2, TrainingData(1)) |
| val model0 = LAlgo0.Model(3, pd) |
| val model1 = LAlgo1.Model(4, pd) |
| val model2 = LAlgo1.Model(5, pd) |
| |
| forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => { |
| val (evalInfo, qpaRDD) = evalData |
| evalInfo shouldBe EvalInfo(1) |
| |
| val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect |
| forAll (qpaSeq) { case (q, p, a) => |
| val Query(qId, qEx, qQx) = q |
| val Actual(aId, aEx, aQx) = a |
| qId shouldBe aId |
| qEx shouldBe ex |
| aEx shouldBe ex |
| qQx shouldBe aQx |
| |
| inside (p) { case Prediction(pId, pQ, pModels, pPs) => { |
| pId shouldBe 10 |
| pQ shouldBe q |
| pModels shouldBe None |
| pPs should have size 3 |
| pPs shouldBe Seq( |
| Prediction(id = 3, q = q, models = Some(model0)), |
| Prediction(id = 4, q = q, models = Some(model1)), |
| Prediction(id = 5, q = q, models = Some(model2)) |
| ) |
| }} |
| } |
| |
| }} |
| |
| } |
| } |
| |
| |