blob: ebc3d60e4a287459666720132616a49e66547374 [file] [log] [blame]
package io.prediction.commons.scalding.appdata.file
import com.twitter.scalding._
import cascading.pipe.Pipe
import cascading.flow.FlowDef
import org.joda.time.DateTime
import io.prediction.commons.scalding.appdata.UsersSource
import io.prediction.commons.scalding.appdata.UsersSource.FIELD_SYMBOLS
import io.prediction.commons.appdata.{ User }
/**
* File Format:
* <id>\t<appid>\t<ct>
*
* Example:
* 1 2 12345
*/
class FileUsersSource(path: String, appId: Int) extends Tsv(
p = path + "users.tsv"
) with UsersSource {
import com.twitter.scalding.Dsl._ // get all the fancy implicit conversions that define the DSL
override def getSource: Source = this
override def readData(uidField: Symbol)(implicit fd: FlowDef): Pipe = {
val users = this.read
.mapTo((0) -> (uidField)) { fields: String =>
fields
}
users
}
override def readObj(objField: Symbol)(implicit fd: FlowDef): Pipe = {
val users = this.read
.mapTo((0, 1, 2) -> objField) { fields: (String, Int, Long) =>
val (id, appid, ct) = fields
User(
id = id,
appid = appid,
ct = new DateTime(ct),
latlng = None,
inactive = None,
attributes = None
)
}
users
}
override def writeData(uidField: Symbol, appid: Int)(p: Pipe)(implicit fd: FlowDef): Pipe = {
val writtenData = p.mapTo((uidField) ->
(FIELD_SYMBOLS("id"), FIELD_SYMBOLS("appid"))) {
fields: String =>
val uid = fields
(uid, appid)
}.write(this)
writtenData
}
override def writeObj(objField: Symbol)(p: Pipe)(implicit fd: FlowDef): Pipe = {
val writtenData = p.mapTo((objField) ->
(FIELD_SYMBOLS("id"), FIELD_SYMBOLS("appid"), FIELD_SYMBOLS("ct"))) { obj: User =>
val ct: java.util.Date = obj.ct.toDate()
(obj.id, obj.appid, ct.getTime())
}.write(this)
writtenData
}
}