blob: 8b653d8566b6478ed5c27b3174748c5babe23823 [file] [log] [blame]
package io.prediction.metrics.commons.map
import io.prediction.commons.Config
import io.prediction.commons.settings.OfflineEvalResult
import io.prediction.commons.filepath.OfflineMetricFile
import scala.io.Source
import java.io.File
import java.io.FileWriter
import java.io.BufferedWriter
import grizzled.slf4j.Logger
case class MAPAtKConfig(
appid: Int = 0,
engineid: Int = 0,
evalid: Int = 0,
metricid: Int = 0,
algoid: Int = 0,
iteration: Int = 0,
splitset: String = "",
k: Int = 0,
goal: String = "",
debug: Boolean = false)
/**
* Mean Average Precision at K for Single Machine
*
* TODO: Eliminate use of Config object. Let scheduler handles it all.
*/
object MAPAtK {
def main(args: Array[String]) {
val commonsConfig = new Config()
val engines = commonsConfig.getSettingsEngines
val parser = new scopt.OptionParser[MAPAtKConfig]("mapatk") {
head("mapatk")
opt[Int]("appid") required () action { (x, c) =>
c.copy(appid = x)
} text ("the App ID that this metric will be applied to")
opt[Int]("engineid") required () action { (x, c) =>
c.copy(engineid = x)
} validate { x =>
engines.get(x) map { _ => success } getOrElse failure(s"the Engine ID does not correspond to a valid Engine")
} text ("the Engine ID that this metric will be applied to")
opt[Int]("evalid") required () action { (x, c) =>
c.copy(evalid = x)
} text ("the OfflineEval ID that this metric will be applied to")
opt[Int]("metricid") required () action { (x, c) =>
c.copy(metricid = x)
} text ("the OfflineEvalMetric ID that this metric will be applied to")
opt[Int]("algoid") required () action { (x, c) =>
c.copy(algoid = x)
} text ("the Algo ID that this metric will be applied to")
opt[Int]("iteration") required () action { (x, c) =>
c.copy(iteration = x)
} text ("the iteration number (starts from 1 for the 1st iteration and then increment for later iterations)")
opt[String]("splitset") required () action { (x, c) =>
c.copy(splitset = x)
} validate { x =>
if (x == "validation" || x == "test") success else failure("--splitset must be either 'validation' or 'test'")
} text ("the split set (validation or test) that this metric will be run against")
opt[Int]("k") required () action { (x, c) =>
c.copy(k = x)
} text ("the k parameter for MAP@k")
opt[String]("goal") required () action { (x, c) =>
c.copy(goal = x)
} validate { x =>
x match {
case "view" | "conversion" | "like" | "rate3" | "rate4" | "rate5" => success
case _ => failure("invalid goal specified")
}
} text ("actions to be treated as relevant (valid values: view, conversion, like, rate3, rate4, rate5)")
opt[Unit]("debug") hidden () action { (x, c) =>
c.copy(debug = true)
} text ("debug mode")
}
parser.parse(args, MAPAtKConfig()) map { config =>
val logger = Logger(MAPAtK.getClass)
val u2iDb = if (config.splitset == "validation") commonsConfig.getAppdataValidationU2IActions else commonsConfig.getAppdataTestU2IActions
val resultsDb = commonsConfig.getSettingsOfflineEvalResults
val engine = engines.get(config.engineid).get
// Collect relevant items for all users and items
logger.info("Collecting relevance data...")
val u2i = u2iDb.getAllByAppid(config.evalid).toSeq filter { u2iAction =>
config.goal match {
case "view" | "conversion" | "like" => u2iAction.action == config.goal
case "rate3" => try { u2iAction.action == "rate" && u2iAction.v.get >= 3 } catch {
case e: Exception =>
logger.error(s"${u2iAction.uid}-${u2iAction.iid}: ${u2iAction.v} (${e.getMessage()})")
false
}
case "rate4" => try { u2iAction.action == "rate" && u2iAction.v.get >= 4 } catch {
case e: Exception =>
logger.error(s"${u2iAction.uid}-${u2iAction.iid}: ${u2iAction.v} (${e.getMessage()})")
false
}
case "rate5" => try { u2iAction.action == "rate" && u2iAction.v.get == 5 } catch {
case e: Exception =>
logger.error(s"${u2iAction.uid}-${u2iAction.iid}: ${u2iAction.v} (${e.getMessage()})")
false
}
}
}
val relevantItems = u2i.groupBy(_.uid).mapValues(_.map(_.iid).toSet)
val relevantUsers = if (engine.infoid == "itemsim") u2i.groupBy(_.iid).mapValues(_.map(_.uid).toSet) else Map[String, Set[String]]()
val relevantItemsPath = OfflineMetricFile(
commonsConfig.settingsLocalTempRoot,
config.appid,
config.engineid,
config.evalid,
config.metricid,
config.algoid,
"relevantItems.tsv")
val relevantItemsWriter = new BufferedWriter(new FileWriter(new File(relevantItemsPath)))
relevantItems.foreach {
case (uid, iids) =>
val iidsString = iids.mkString(",")
relevantItemsWriter.write(s"${uid}\t${iidsString}\n")
}
relevantItemsWriter.close()
logger.info(s"# users: ${relevantItems.size}")
if (engine.infoid == "itemsim") logger.info(s"# items: ${relevantUsers.size}")
// Read top-k list for every user
val topKFilePath = OfflineMetricFile(
commonsConfig.settingsLocalTempRoot,
config.appid,
config.engineid,
config.evalid,
config.metricid,
config.algoid,
"topKItems.tsv")
logger.info(s"Reading top-K list from: $topKFilePath")
val prefixSize = config.evalid.toString.length + 1
val topKItems: Map[String, Seq[String]] = engine.infoid match {
case "itemrec" => {
Source.fromFile(topKFilePath).getLines().toSeq.map(
_.split("\t")).groupBy(
_.apply(0)) map { t =>
(t._1.drop(prefixSize) -> t._2.apply(0).apply(1).split(",").toSeq.map(_.drop(prefixSize)))
}
}
case "itemsim" => {
/**
* topKItems.tsv for ItemSim
* iid simiid score
* i0 i1 3.2
* i0 i4 2.5
* i0 i5 1.4
*
* 1. Read all lines into a Seq[String].
* 2. Split by \t into Seq[Array[String]].
* 3. Group by first element (iid) into Map[String, Seq[Array[String]]].
* 4. Sort and filter Seq[Array[String]] to become Map[String, Seq[String]].
*/
Source.fromFile(topKFilePath).getLines().toSeq.map(
_.split("\t")).groupBy(
_.apply(0)) map { t =>
(t._1.drop(prefixSize) -> t._2.sortBy(_.apply(2)).reverse.map(_.apply(1).drop(prefixSize)))
}
}
}
logger.info(s"Running MAP@${config.k} for ${engine.infoid} engine")
val mapAtK: Double = engine.infoid match {
case "itemrec" => {
val apAtK = topKItems map { t =>
val score = relevantItems.get(t._1) map { ri =>
averagePrecisionAtK(config.k, t._2, ri)
} getOrElse 0.0
(t._1, score)
}
val apAtKPath = OfflineMetricFile(
commonsConfig.settingsLocalTempRoot,
config.appid,
config.engineid,
config.evalid,
config.metricid,
config.algoid,
"apAtK.tsv")
val apAtKWriter = new BufferedWriter(new FileWriter(new File(apAtKPath)))
apAtK.foreach {
case (uid, score) =>
apAtKWriter.write(s"${uid}\t${score}\n")
}
apAtKWriter.close()
apAtK.map(_._2).sum / scala.math.min(topKItems.size, relevantItems.size)
}
case "itemsim" => {
val iapAtK = topKItems map { t =>
relevantUsers.get(t._1) map { ru =>
val apAtK = ru map { uid =>
relevantItems.get(uid) map { ri =>
averagePrecisionAtK(config.k, t._2, ri)
} getOrElse 0.0
}
apAtK.sum / scala.math.min(ru.size, relevantItems.size)
} getOrElse 0.0
}
iapAtK.sum / scala.math.min(topKItems.size, relevantUsers.size)
}
case _ => 0.0
}
logger.info(s"MAP@${config.k} = $mapAtK. Saving results...")
resultsDb.save(OfflineEvalResult(
evalid = config.evalid,
metricid = config.metricid,
algoid = config.algoid,
score = mapAtK,
iteration = config.iteration,
splitset = config.splitset))
logger.info(s"MAP@${config.k} has run to completion")
}
}
/**
* Calculate the mean average precision @ k
*
* ap@k = sum(P(i)/min(m, k)) wher i=1 to k
* k is number of prediction to be retrieved.
* P(i) is the precision at position i of the top-K list
* if the item at position i is relevant, then P(i) = (the number of releavent items up to that position in the top-k list / position)
* if the item at position i is not relevant, then P(i)=0
* m is the number of relevant items for this user.
*
* @return the average precision at k
*/
def averagePrecisionAtK(k: Int, predictedItems: Seq[String], relevantItems: Set[String]): Double = {
// supposedly the predictedItems.size should match k
// NOTE: what if predictedItems is less than k? use the avaiable items as k.
val n = scala.math.min(predictedItems.size, k)
// find if each element in the predictedItems is one of the relevant items
// if so, map to 1. else map to 0
// (0, 1, 0, 1, 1, 0, 0)
val relevantBinary: Seq[Int] = predictedItems.take(n).map { x => if (relevantItems(x)) 1 else 0 }
val pAtKNom = relevantBinary.scanLeft(0)(_ + _).drop(1).zip(relevantBinary).map(t => if (t._2 != 0) t._1 else 0).map(_.toDouble)
val pAtKDenom = 1 to relevantBinary.size
val pAtK = pAtKNom zip pAtKDenom map { t => t._1 / t._2 }
val apAtKDenom = scala.math.min(n, relevantItems.size)
if (apAtKDenom == 0) 0 else pAtK.sum / apAtKDenom
}
}