blob: c4eca3c2e4846511de1dadbdb274ffb9e7f248b3 [file] [log] [blame]
package io.prediction.commons.scalding.modeldata.file
import com.twitter.scalding._
import cascading.pipe.Pipe
import cascading.flow.FlowDef
//import io.prediction.commons.scalding.ModelDataFile
import io.prediction.commons.scalding.modeldata.ItemSimScoresSource
import io.prediction.commons.scalding.modeldata.ItemSimScoresSource.FIELD_SYMBOLS
class FileItemSimScoresSource(path: String) extends Tsv(
p = path + "itemSimScores.tsv" //ModelDataFile(appId, engineId, algoId, evalId, "itemSimScores.tsv")
) with ItemSimScoresSource {
import com.twitter.scalding.Dsl._ // get all the fancy implicit conversions that define the DSL
override def getSource = this
override def writeData(iidField: Symbol, simiidsField: Symbol, algoid: Int, modelSet: Boolean)(p: Pipe)(implicit fd: FlowDef): Pipe = {
val dataPipe = p.mapTo((iidField, simiidsField) ->
(FIELD_SYMBOLS("iid"), FIELD_SYMBOLS("simiids"), FIELD_SYMBOLS("scores"), FIELD_SYMBOLS("simitypes"), FIELD_SYMBOLS("algoid"), FIELD_SYMBOLS("modelset"))) {
fields: (String, List[(String, Double, List[String])]) =>
val (iid, simiidsList) = fields
// convert list to comma-separated string
val simiids = simiidsList.map(_._1).mkString(",")
val scores = simiidsList.map(_._2).mkString(",")
val simitypes = simiidsList.map(_._3).map(x => "[" + x.mkString(",") + "]").mkString(",")
(iid, simiids, scores, simitypes, algoid, modelSet)
}.write(this)
dataPipe
}
}