blob: 4eaee43a62892470c7eaeb5e08a7dcc41ee7771d [file] [log] [blame]
/** Copyright 2015 TappingStone, Inc.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package io.prediction.controller
import io.prediction.core.BaseDataSource
import io.prediction.core.BasePreparator
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import scala.reflect._
import scala.reflect.runtime.universe._
/** Base class of a parallel preparator.
* A parallel preparator can be run in parallel on a cluster and produces a
* prepared data that is distributed across a cluster.
* @tparam TD Training data class.
* @tparam PD Prepared data class.
* @group Preparator
abstract class PPreparator[TD, PD]
extends BasePreparator[TD, PD] {
def prepareBase(sc: SparkContext, td: TD): PD = {
prepare(sc, td)
/** Implement this method to produce prepared data that is ready for model
* training.
* @param sc An Apache Spark context.
* @param trainingData Training data to be prepared.
def prepare(sc: SparkContext, trainingData: TD): PD
/** Base class of a local preparator.
* A local preparator runs locally within a single machine and produces
* prepared data that can fit within a single machine.
* @tparam TD Training data class.
* @tparam PD Prepared data class.
* @group Preparator
abstract class LPreparator[TD, PD : ClassTag]
extends BasePreparator[RDD[TD], RDD[PD]] {
def prepareBase(sc: SparkContext, rddTd: RDD[TD]): RDD[PD] = {
/** Implement this method to produce prepared data that is ready for model
* training.
* @param trainingData Training data to be prepared.
def prepare(trainingData: TD): PD
/** A helper concrete implementation of [[io.prediction.core.BasePreparator]]
* that pass training data through without any special preparation.
* @group Preparator
class IdentityPreparator[TD] extends BasePreparator[TD, TD] {
def prepareBase(sc: SparkContext, td: TD): TD = td
/** A helper concrete implementation of [[io.prediction.core.BasePreparator]]
* that pass training data through without any special preparation.
* @group Preparator
object IdentityPreparator {
/** Produces an instance of [[IdentityPreparator]].
* @param ds Data source.
def apply[TD](ds: Class[_ <: BaseDataSource[TD, _, _, _]]): Class[IdentityPreparator[TD]] =
/** A helper concrete implementation of [[io.prediction.controller.PPreparator]]
* that pass training data through without any special preparation.
* @group Preparator
class PIdentityPreparator[TD] extends PPreparator[TD, TD] {
def prepare(sc: SparkContext, td: TD): TD = td
/** A helper concrete implementation of [[io.prediction.controller.PPreparator]]
* that pass training data through without any special preparation.
* @group Preparator
object PIdentityPreparator {
/** Produces an instance of [[PIdentityPreparator]].
* @param ds Data source.
def apply[TD](ds: Class[_ <: PDataSource[TD, _, _, _]]): Class[PIdentityPreparator[TD]] =
/** A helper concrete implementation of [[io.prediction.controller.LPreparator]]
* that pass training data through without any special preparation.
* @group Preparator
class LIdentityPreparator[TD]
extends LPreparator[TD, TD]()(JavaUtils.fakeClassTag[TD]) {
def prepare(td: TD): TD = td
/** A helper concrete implementation of [[io.prediction.controller.LPreparator]]
* that pass training data through without any special preparation.
* @group Preparator
object LIdentityPreparator {
/** Produces an instance of [[LIdentityPreparator]].
* @param ds Data source.
def apply[TD](ds: Class[_ <: LDataSource[TD, _, _, _]]): Class[LIdentityPreparator[TD]] =