blob: e5f7d39e855461ce3fce24da8f860b588c34cff5 [file] [log] [blame]
package io.prediction.commons.scalding.appdata
import com.twitter.scalding._
import cascading.pipe.Pipe
import cascading.flow.FlowDef
// use trait because different DB may have a bit different raw read data.
// such as time format, etc.
// The DB Source class should implement this trait so the algo implementers
// can get the same type of pipe returned regardless of the actual DB type.
trait U2iActionsSource {
import com.twitter.scalding.Dsl._ // get all the fancy implicit conversions that define the DSL
def getSource: Source
/**
* read data and return Pipe with field name of the Symbol parameters
* actionField: Symbol of action(String)
* uidField: Symbol of uid(String)
* iidField: Symbol of iid(String)
* tField: Symbol of t(String)
* vField: Symbol of v(Option[String])
*/
def readData(actionField: Symbol, uidField: Symbol, iidField: Symbol, tField: Symbol, vField: Symbol)(implicit fd: FlowDef): Pipe
/**
* map pipe's field data to DB table fields and write to dbSink.
* actionField: Symbol of action(String)
* uidField: Symbol of uid(String)
* iidField: Symbol of iid(String)
* tField: Symbol of t(String)
* vField: Symbol of v(Option[String])
* appid: App ID(Int)
* p: Pipe. the data pipe.
*/
def writeData(actionField: Symbol, uidField: Symbol, iidField: Symbol, tField: Symbol, vField: Symbol, appid: Int)(p: Pipe)(implicit fd: FlowDef): Pipe
}
object U2iActionsSource {
/**
* define the corresponding cascading Symbol name for each DB table field.
* ("table field name" -> Symbol)
*/
val FIELD_SYMBOLS: Map[String, Symbol] = Map(
("action" -> 'action),
("uid" -> 'uid),
("iid" -> 'iid),
("t" -> 't),
("v" -> 'v),
("appid" -> 'appid))
}