blob: 8a4b2e0b54cbc0fa6b644ca3adc5386ca780e96f [file] [log] [blame]
/** Copyright 2014 TappingStone, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prediction.engines.itemrank
import io.prediction.controller.Metrics
import io.prediction.controller.Params
import io.prediction.controller.NiceRendering
import com.github.nscala_time.time.Imports._
import scala.math.BigDecimal
import breeze.stats.{ mean, meanAndVariance }
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.{read, write}
import org.json4s.native.Serialization
import java.io.FileOutputStream
import java.io.ObjectOutputStream
import java.io.FileInputStream
import java.io.ObjectInputStream
import scala.io.Source
import java.io.PrintWriter
import java.io.File
import io.prediction.engines.util.{ MetricsVisualization => MV }
import scala.util.hashing.MurmurHash3
import scala.collection.immutable.NumericRange
import scala.collection.immutable.Range
trait HasName {
def name: String
}
case class Stats(
val average: Double,
val count: Long,
val stdev: Double,
val min: Double,
val max: Double
) extends Serializable
case class HeatMapData (
val columns: Array[String],
val data: Seq[(String, Array[Double])]
) extends Serializable
case class DetailedMetricsData(
val name: String,
val measureType: String,
val algoMean: Double,
val algoStats: Stats,
val heatMap: HeatMapData,
val runs: Seq[(String, Stats, Stats)], // name, algo, baseline
val aggregations: Seq[(String, Seq[(String, Stats)])])
extends Serializable with NiceRendering {
override def toString(): String = {
val b = 1.96 * algoStats.stdev / Math.sqrt(algoStats.count)
val lb = algoStats.average - b
val ub = algoStats.average + b
f"$measureType $name ${algoStats.average}%.4f [$lb%.4f, $ub%.4f]"
}
def toHTML(): String = html.detailed().toString
def toJSON(): String = {
implicit val formats = DefaultFormats
Serialization.write(this)
}
}
object MeasureType extends Enumeration {
type Type = Value
val MeanAveragePrecisionAtK, PrecisionAtK = Value
}
abstract class ItemRankMeasure extends Serializable {
def calculate(query: Query, prediction: Prediction, actual: Actual): Double
}
// If k == -1, use query size. Otherwise, use k.
class ItemRankMAP(val k: Int) extends ItemRankMeasure {
def calculate(query: Query, prediction: Prediction, actual: Actual)
: Double = {
val kk = (if (k == -1) query.iids.size else k)
averagePrecisionAtK(
kk,
prediction.items.map(_._1),
actual.iids.toSet)
}
override def toString(): String = s"MAP@$k"
// metric
private def averagePrecisionAtK[T](k: Int, p: Seq[T], r: Set[T]): Double = {
// supposedly the predictedItems.size should match k
// NOTE: what if predictedItems is less than k? use the avaiable items as k.
val n = scala.math.min(p.size, k)
// find if each element in the predictedItems is one of the relevant items
// if so, map to 1. else map to 0
// (0, 1, 0, 1, 1, 0, 0)
val rBin: Seq[Int] = p.take(n).map { x => if (r(x)) 1 else 0 }
val pAtKNom = rBin.scanLeft(0)(_ + _)
.drop(1) // drop 1st one which is initial 0
.zip(rBin)
.map(t => if (t._2 != 0) t._1.toDouble else 0.0)
// ( number of hits at this position if hit or 0 if miss )
val pAtKDenom = 1 to rBin.size
val pAtK = pAtKNom.zip(pAtKDenom).map { t => t._1 / t._2 }
val apAtKDenom = scala.math.min(n, r.size)
if (apAtKDenom == 0) 0 else pAtK.sum / apAtKDenom
}
}
class ItemRankPrecision(val k: Int) extends ItemRankMeasure {
override def toString(): String = s"Precision@$k"
def calculate(query: Query, prediction: Prediction, actual: Actual)
: Double = {
val kk = (if (k == -1) query.iids.size else k)
val actualItems: Set[String] = actual.iids.toSet
val relevantCount = prediction.items.take(kk)
.map(_._1)
.filter(iid => actualItems(iid))
.size
val denominator = Seq(kk, prediction.items.size, actualItems.size).min
relevantCount.toDouble / denominator
}
}
// optOutputPath is used for debug purpose. If specified, metrics will output
// the data class to the specified path, and the renderer can generate the html
// independently.
class DetailedMetricsParams(
val name: String = "",
val optOutputPath: Option[String] = None,
val buckets: Int = 10,
val measureType: MeasureType.Type = MeasureType.MeanAveragePrecisionAtK,
val measureK: Int = -1
) extends Params {}
class ItemRankDetailedMetrics(params: DetailedMetricsParams)
extends Metrics[DetailedMetricsParams,
HasName, Query, Prediction, Actual,
MetricUnit, Seq[MetricUnit], DetailedMetricsData] {
val measure: ItemRankMeasure = params.measureType match {
case MeasureType.MeanAveragePrecisionAtK => {
new ItemRankMAP(params.measureK)
}
case MeasureType.PrecisionAtK => {
new ItemRankPrecision(params.measureK)
}
case _ => {
throw new NotImplementedError(
s"MeasureType ${params.measureType} not implemented")
}
}
override def computeUnit(query: Query, prediction: Prediction,
actual: Actual): MetricUnit = {
val k = query.iids.size
val score = measure.calculate(query, prediction, actual)
// For calculating baseline, we use the input order of query.
val baseline = measure.calculate(
query,
Prediction(query.iids.map(e => (e, 0.0)), isOriginal = false),
actual)
val mu = new MetricUnit(
q = query,
p = prediction,
a = actual,
score = score,
baseline = baseline,
uidHash = MurmurHash3.stringHash(query.uid)
)
/*
if (mu.score > 0.80 && mu.score < 1.00) {
println()
println(mu.score)
println(mu.q)
println(mu.p)
println(mu.a)
}
*/
mu
}
// calcualte MAP at k
override def computeSet(dataParams: HasName,
metricUnits: Seq[MetricUnit]): Seq[MetricUnit] = metricUnits
def calculate(values: Seq[Double]): Stats = {
val mvc = meanAndVariance(values)
Stats(mvc.mean, mvc.count, mvc.stdDev, values.min, values.max)
}
def calculateResample(
values: Seq[(Int, Double)]
): Stats = {
// JackKnife resampling.
val segmentSumCountMap: Map[Int, (Double, Int)] = values
.map{ case(k, v) => (k % params.buckets, v) }
.groupBy(_._1)
.mapValues(l => (l.map(_._2).sum, l.size))
val sum = values.map(_._2).sum
val count = values.size
val segmentMeanMap: Map[Int, Double] = segmentSumCountMap
.mapValues { case(sSum, sCount) => (sum - sSum) / (count - sCount) }
val segmentValues = segmentMeanMap.values
// Assume the mean is normal distributed
val mvc = meanAndVariance(segmentValues)
// Double => String
/*
val groupByFunc = groupByRange((0.0 until 1.0 by 0.1).toArray, "%.2f")
val bucketByScoreMap: Map[String, Int] = values
.map(_._2).map(groupByFunc)
.groupBy(identity).mapValues(_.size)
*/
// Stats describes the properties of the buckets
return Stats(mvc.mean, params.buckets, mvc.stdDev, segmentValues.min, segmentValues.max)
}
def aggregateMU(units: Seq[MetricUnit], groupByFunc: MetricUnit => String)
: Seq[(String, Stats)] =
aggregate[MetricUnit](units, _.score, groupByFunc)
def aggregate[T](
units: Seq[T],
scoreFunc: T => Double,
groupByFunc: T => String): Seq[(String, Stats)] = {
units
.groupBy(groupByFunc)
.mapValues(_.map(e => scoreFunc(e)))
.map{ case(k, l) => (k, calculate(l)) }
.toSeq
.sortBy(-_._2.average)
}
// return a double to key map based on boundaries.
def groupByRange(values: Array[Double], format: String = "%f")
: Double => String = {
val keys: Array[String] = (0 to values.size).map { i =>
val s = (if (i == 0) Double.NegativeInfinity else values(i-1))
val e = (if (i < values.size) values(i) else Double.PositiveInfinity)
//s"[$s, $e)"
"[" + format.format(s) + ", " + format.format(e) + ")"
}.toArray
def f(v: Double): String = {
// FIXME. Use binary search or indexWhere
val i: Option[Int] = (0 until values.size).find(i => v < values(i))
keys(i.getOrElse(values.size))
}
return f
}
def computeHeatMap(
input: Seq[(String, Seq[Double])],
boundaries: Array[Double],
format: String = "%.2f") : HeatMapData = {
val bCount: Int = boundaries.size
val columns: Array[String] = (0 to bCount).map { i =>
val s = (if (i == 0) Double.NegativeInfinity else boundaries(i-1))
val e = (
if (i < boundaries.size) boundaries(i)
else Double.PositiveInfinity)
"[" + format.format(s) + ", " + format.format(e) + ")"
}.toArray
val data: Seq[(String, Array[Double])] = input
.map { case(key, values) => {
val sum = values.size
//val distributions: Array[Double] = values
val distributions: Map[Int, Double] = values
.map { v =>
val i = boundaries.indexWhere(b => (v <= b))
(if (i == -1) bCount else i)
}
.groupBy(identity)
.mapValues(_.size.toDouble)
.mapValues(v => v / sum)
(key, (0 to bCount).map { i => distributions.getOrElse(i, 0.0) }.toArray)
}}
HeatMapData(columns = columns, data = data)
}
override def computeMultipleSets(
input: Seq[(HasName, Seq[MetricUnit])]): DetailedMetricsData = {
val allUnits: Seq[MetricUnit] = input.flatMap(_._2)
val overallStats = (
"Overall",
calculateResample(allUnits.map(mu => (mu.uidHash, mu.score))),
calculateResample(allUnits.map(mu => (mu.uidHash, mu.baseline))))
val runsStats: Seq[(String, Stats, Stats)] = input
.map { case(dp, mus) =>
(dp.name,
calculateResample(mus.map(mu => (mu.uidHash, mu.score))),
calculateResample(mus.map(mu => (mu.uidHash, mu.baseline))))
}
.sortBy(_._1)
// Aggregation Stats
val aggregateByActualSize: Seq[(String, Stats)] = allUnits
.groupBy(_.a.iids.size)
.mapValues(_.map(_.score))
.map{ case(k, l) => (k.toString, calculate(l)) }
.toSeq
.sortBy(-_._2.average)
val scoreAggregation = aggregateMU(
allUnits,
mu => groupByRange((0.0 until 1.0 by 0.1).toArray, "%.2f")(mu.score))
val actionCountAggregation = aggregateMU(
allUnits,
mu => groupByRange(Array(0, 1, 3, 10, 30, 100, 300), "%.0f")
(mu.a.previousActionCount))
val itemCountAggregation = aggregate[(String, MetricUnit)](
allUnits.flatMap(mu => mu.a.iids.map(item => (item, mu))),
_._2.score,
_._1)
val dateAggregation = aggregateMU(
allUnits,
_.a.localDate.toString)
val localHourAggregation = aggregateMU(
allUnits,
_.a.localDateTime.getHourOfDay.toString)
val isOriginalAggregation = aggregateMU(allUnits, _.p.isOriginal.toString)
val avgOrderSizeAggregation = aggregateMU(
allUnits,
mu => groupByRange(Array(0, 1, 2, 3, 5, 8, 13), "%.0f")
(mu.a.averageOrderSize))
val previousOrdersAggregation = aggregateMU(
allUnits,
mu => groupByRange(Array(0, 1, 3, 10, 30, 100), "%.0f")
(mu.a.previousOrders))
val varietyAggregation = aggregateMU(
allUnits,
mu => groupByRange(Array(0, 1, 2, 3, 5, 8, 13, 21), "%.0f")
(mu.a.variety))
val heatMapInput: Seq[(String, Seq[Double])] = input
.map { case(k, mus) => (k.name, mus.map(_.score)) }
val heatMap: HeatMapData = computeHeatMap(
heatMapInput,
(0.0 until 1.0 by 0.0333).toArray,
"%.2f")
val outputData = DetailedMetricsData (
name = params.name,
measureType = measure.toString,
algoMean = overallStats._2.average,
algoStats = overallStats._2,
//runs = Seq(overallStats, baselineStats) ++ runsStats,
//runs = Seq(overallStats, overallResampledStats) ++ runsStats,
heatMap = heatMap,
runs = Seq(overallStats) ++ runsStats,
aggregations = Seq(
("ByActualSize", aggregateMU(allUnits, _.a.iids.size.toString)),
("ByScore", scoreAggregation),
("ByActionCount", actionCountAggregation),
("ByFlattenItem", itemCountAggregation),
("ByDate", dateAggregation),
("ByLocalHour", localHourAggregation),
("ByIsOriginal", isOriginalAggregation),
("ByAvgOrderSize", avgOrderSizeAggregation),
("ByPreviousOrders", previousOrdersAggregation),
("ByVariety", varietyAggregation)
)
)
// FIXME: Use param opt path
params.optOutputPath.map { path =>
MV.save(outputData, path)
}
outputData
}
private def printDouble(d: Double): String = {
BigDecimal(d).setScale(4, BigDecimal.RoundingMode.HALF_UP).toString
}
}
object ItemRankDetailedMain {
def main(args: Array[String]) {
MV.render(MV.load[DetailedMetricsData](args(0)), args(0))
}
}