blob: 3683e2c710dbc54b1909f4d2ff2f0c880f595ac0 [file] [log] [blame]
package io.prediction.metrics.scalding.itemrec.map
import com.twitter.scalding._
import io.prediction.commons.filepath.OfflineMetricFile
import io.prediction.commons.scalding.appdata.U2iActions
//import io.prediction.commons.scalding.modeldata.ItemRecScores
/**
* Source:
* Test set u2iActions.
*
* Sink:
* relevantItems.tsv eg u0 i0,i1,i2
*
* Description:
* Generate relevantItems for MAP@k
*
* Required args:
* --test_dbType: <string> test_appdata DB type (eg. mongodb)
* --test_dbName: <string>
*
* --training_dbType: <string> training_appdata DB type
* --training_dbName: <string>
*
* --modeldata_dbType: <string> modeldata DB type
* --modeldata_dbName: <string>
*
* --hdfsRoot: <string>. Root directory of the HDFS
*
* --appid: <int>
* --engineid: <int>
* --evalid: <int>
* --metricid: <int>
* --algoid: <int>
*
* --kParam: <int>
* --goalParam: <string> ("view", "conversion", "like", "rate3", "rate4", "rate5)
*
* Optional args:
* --test_dbHost: <string> (eg. "127.0.0.1")
* --test_dbPort: <int> (eg. 27017)
*
* --training_dbHost: <string>
* --training_dbPort: <int>
*
* --modeldata_dbHost: <string>
* --modeldata_dbPort <int>
*
* --debug: <String>. "test" - for testing purpose
*
* Example:
* scald.rb --hdfs-local io.prediction.metrics.scalding.itemrec.map.MAPAtKDataPreparator --test_dbType mongodb --test_dbName test_appdata --test_dbHost 127.0.0.1 --test_dbPort 27017 --training_dbType mongodb --training_dbName training_appdata --training_dbHost 127.0.0.1 --training_dbPort 27017 --modeldata_dbType file --modeldata_dbName modeldata_path/ --hdfsRoot hdfs/predictionio/ --appid 34 --engineid 3 --evalid 15 --metricid 10 --algoid 9 --kParam 30 --goalParam rate3
*
*/
class MAPAtKDataPreparator(args: Args) extends Job(args) {
/**
* parse arguments
*/
val test_dbTypeArg = args("test_dbType")
val test_dbNameArg = args("test_dbName")
val test_dbHostArg = args.list("test_dbHost")
val test_dbPortArg = args.list("test_dbPort") map (x => x.toInt)
val training_dbTypeArg = args("training_dbType")
val training_dbNameArg = args("training_dbName")
val training_dbHostArg = args.list("training_dbHost")
val training_dbPortArg = args.list("training_dbPort") map (x => x.toInt)
val modeldata_dbTypeArg = args("modeldata_dbType")
val modeldata_dbNameArg = args("modeldata_dbName")
val modeldata_dbHostArg = args.list("modeldata_dbHost")
val modeldata_dbPortArg = args.list("modeldata_dbPort") map (x => x.toInt)
val hdfsRootArg = args("hdfsRoot")
val appidArg = args("appid").toInt
val engineidArg = args("engineid").toInt
val evalidArg = args("evalid").toInt
val metricidArg = args("metricid").toInt
val algoidArg = args("algoid").toInt
val GOAL_VIEW: String = "view"
val GOAL_CONVERSION: String = "conversion"
val GOAL_LIKE: String = "like"
val GOAL_RATE3: String = "rate3"
val GOAL_RATE4: String = "rate4"
val GOAL_RATE5: String = "rate5"
val GOAL_ARG_LIST: List[String] = List(GOAL_VIEW, GOAL_CONVERSION, GOAL_LIKE, GOAL_RATE3, GOAL_RATE4, GOAL_RATE5)
val goalParamArg = args("goalParam")
require(GOAL_ARG_LIST.contains(goalParamArg), "goalParam " + goalParamArg + " is not valid.")
val kParamArg = args("kParam").toInt
/**
* constants
*/
final val ACTION_RATE = "rate"
final val ACTION_LIKE = "like"
final val ACTION_DISLIKE = "dislike"
final val ACTION_VIEW = "view"
//final val ACTION_VIEWDETAILS = "viewDetails"
final val ACTION_CONVERSION = "conversion"
/**
* source
*/
/*val trainingU2i = U2iActions(appId=evalidArg,
dbType=training_dbTypeArg, dbName=training_dbNameArg, dbHost=training_dbHostArg, dbPort=training_dbPortArg).readData('actionTrain, 'uidTrain, 'iidTrain, 'tTrain, 'vTrain)*/
val testU2i = U2iActions(appId = evalidArg,
dbType = test_dbTypeArg, dbName = test_dbNameArg, dbHost = test_dbHostArg, dbPort = test_dbPortArg).readData('actionTest, 'uidTest, 'iidTest, 'tTest, 'vTest)
/**
* computation
*/
// for each user, get a list of items which match the goalParam
// TODO: filter out items appeared in trainingU2i?
val testSetRelevant = testU2i
.filter('actionTest, 'vTest) { fields: (String, Option[String]) =>
val (action, v) = fields
val cond: Boolean = goalParamArg match {
case GOAL_VIEW => (action == ACTION_VIEW)
case GOAL_CONVERSION => (action == ACTION_CONVERSION)
case GOAL_LIKE => (action == ACTION_LIKE)
case GOAL_RATE3 => try {
(action == ACTION_RATE) && (v.get.toInt >= 3)
} catch {
case e: Exception => {
assert(false, s"Failed to convert v field ${v} to int. Exception:" + e)
false
}
}
case GOAL_RATE4 => try {
(action == ACTION_RATE) && (v.get.toInt >= 4)
} catch {
case e: Exception => {
assert(false, s"Failed to convert v field ${v} to int. Exception:" + e)
false
}
}
case GOAL_RATE5 => try {
(action == ACTION_RATE) && (v.get.toInt >= 5)
} catch {
case e: Exception => {
assert(false, s"Failed to convert v field ${v} to int. Exception:" + e)
false
}
}
case _ => {
assert(false, "Invalid goalParam " + goalParamArg + ".")
false
}
}
cond
}
.groupBy('uidTest) { _.toList[String]('iidTest -> 'relevantList) }
.mapTo(('uidTest, 'relevantList) -> ('uidTest, 'relevantList)) { fields: (String, List[String]) =>
val (uidTest, relevantList) = fields
(uidTest, relevantList.mkString(","))
}
.write(Tsv(OfflineMetricFile(hdfsRootArg, appidArg, engineidArg, evalidArg, metricidArg, algoidArg, "relevantItems.tsv")))
}