blob: 170d675e1d1b3af77bcd1b683cf114c12a8a3e3d [file] [log] [blame]
package io.prediction.commons.scalding.appdata
import com.twitter.scalding._
import cascading.pipe.Pipe
import cascading.flow.FlowDef
// use trait because different DB may have a bit different raw read data.
// such as time format, etc.
// The DB Source class should implement this trait so the algo implementers
// can get the same type of pipe returned regardless of the actual DB type.
trait UsersSource {
import com.twitter.scalding.Dsl._ // get all the fancy implicit conversions that define the DSL
def getSource: Source
/**
* read data and return Pipe with field name of the Symbol parameters and expected data type
* uidField: Symbol of iid(String)
*/
def readData(uidField: Symbol)(implicit fd: FlowDef): Pipe
/**
* read User object
*/
def readObj(objField: Symbol)(implicit fd: FlowDef): Pipe = {
throw new RuntimeException("UsersSource readObj is not implemented.")
}
/**
* map pipe's field data to DB table fields and write to dbSink
* uidField: Symbol of iid(String)
* appid: Appid Int
*/
def writeData(uidField: Symbol, appid: Int)(p: Pipe)(implicit fd: FlowDef): Pipe
/**
* write User object
*/
def writeObj(objField: Symbol)(p: Pipe)(implicit fd: FlowDef): Pipe = {
throw new RuntimeException("UsersSource writeObj is not implemented.")
}
}
object UsersSource {
/**
* define the corresponding cascading Symbol name for each DB table field.
* ("table field name" -> Symbol)
*/
val FIELD_SYMBOLS: Map[String, Symbol] = Map(
("id" -> 'id),
("appid" -> 'appid),
("ct" -> 'ct))
}