blob: 896303d2fe3068e991c27678fb179f2928c74f9e [file] [log] [blame]
/** Copyright 2014 TappingStone, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prediction.engines.itemrank
// This module allows users to evaluate their algorithm data with their acutal
// data. It takes the mongodb dump (from Version 0.7 or before) of the three key
// collections: User, Item, and U2IActions. It evaluates the data in a
// time-rolling fashsion. First, it sets a cutoff day d and uses the data before
// it for training, and then use the following n days for testing. Then, it
// shifts the cutoff day by n, i.e. using data before (d + n-days) for training,
// then use data between d + n and d + 2n for testing, and so on until the last
// day.
//
// In each test, we construct a query using a combination of 1. all conversion
// data of a user on a day; and 2. the global top x sold items of that day.
//
// Notice that this replay may not be completely accurate as User and Item are
// not event log.
import io.prediction.controller._
import io.prediction.controller.{ Params => BaseParams }
import com.github.nscala_time.time.Imports._
import org.joda.time.Instant
import org.json4s._
import org.json4s.native.JsonMethods._
import scala.io.Source
import scala.collection.immutable.HashMap
import scala.util.hashing.MurmurHash3
import io.prediction.engines.base.ItemTD
import io.prediction.engines.base.UserTD
import io.prediction.engines.base.U2IActionTD
import io.prediction.engines.base.TrainingData
case class ReplaySliceParams(
val name: String,
val idx: Int
) extends Serializable with HasName
object ReplayDataSource {
case class Params(
val userPath: String,
val itemPath: String,
val u2iPath: String,
val baseDate: LocalDate,
val fromIdx: Int,
val untilIdx: Int,
val testingWindowSize: Int,
// Mix top x sold items in Query
val numTopSoldItems: Int,
// Only items belonging to this whitelist is considered.
val whitelistItypes: Seq[String],
// Only u2i belonging to this single action is considered.
val whitelistAction: String
) extends BaseParams
case class PreprocessedData(
val userList: Array[User],
val itemList: Array[Item],
val date2u2iList: Map[LocalDate, Array[U2I]]
) extends Serializable {
val ui2UserTd: Map[Int, UserTD] = userList
.zipWithIndex
.map(_.swap)
.toMap
.mapValues(user => new UserTD(user._id))
val ui2uid: Map[Int, String] = ui2UserTd.mapValues(_.uid)
val uid2ui: Map[String, Int] = ui2uid.map(_.swap)
val ii2ItemTd: Map[Int, ItemTD] = itemList
.zipWithIndex
.map(_.swap)
.toMap
.mapValues(
item => new ItemTD(item._id, item.itypes.toSeq, None, None, false))
val ii2iid: Map[Int, String] = ii2ItemTd.mapValues(_.iid)
val iid2ii: Map[String, Int] = ii2iid.map(_.swap)
val date2ActionTds: Map[LocalDate, Array[U2IActionTD]] = date2u2iList
.mapValues(
_.map(u2i => new U2IActionTD(
uid2ui(u2i.uid),
iid2ii(u2i.iid),
u2i.action,
None,
u2i.t.$date)))
val dailyServedItems: Map[LocalDate, Array[(String, Int)]] = date2u2iList
.mapValues(
_.map(_.iid).groupBy(identity).mapValues(_.size).toArray.sortBy(-_._2)
)
}
}
class ReplayDataSource(val dsp: ReplayDataSource.Params)
extends LDataSource[
ReplayDataSource.Params, ReplaySliceParams, TrainingData, Query, Actual] {
def load(): (Array[User], Array[Item], Array[U2I]) = {
implicit val formats = DefaultFormats
val u2iList = Source
.fromFile(dsp.u2iPath).getLines
.map { s => parse(s).extract[U2I] }
.toArray
val userList = Source
.fromFile(dsp.userPath).getLines
.map { s => parse(s).extract[User] }
.toArray
val itemList = Source
.fromFile(dsp.itemPath)
.getLines
.map { s => parse(s).extract[Item] }
.toArray
return (userList, itemList, u2iList)
}
def preprocess(input: (Array[User], Array[Item], Array[U2I]))
: ReplayDataSource.PreprocessedData = {
val (users, items, u2is) = input
val whitelistItypeSet = Set(dsp.whitelistItypes:_*)
val validItems: Array[Item] = items
.filter(_.itypes.find(it => whitelistItypeSet(it)) != None)
val validIidSet: Set[String] = validItems.map(_._id).toSet
val date2Actions: Map[LocalDate, Array[U2I]] = u2is
.filter(u2i => validIidSet(u2i.iid))
.filter(u2i => u2i.action == dsp.whitelistAction)
.groupBy(_.dt.toLocalDate)
ReplayDataSource.PreprocessedData(users, validItems, date2Actions)
}
def generateParams(): Seq[ReplaySliceParams] = {
Range(dsp.fromIdx, dsp.untilIdx, dsp.testingWindowSize).map { idx =>
val trainingUntilDate: LocalDate = dsp.baseDate.plusDays(idx)
val dow = trainingUntilDate.dayOfWeek.getAsShortText
ReplaySliceParams(
name = s"${trainingUntilDate.toString()} $dow",
idx = idx)
}
}
def generateOne(input: (ReplayDataSource.PreprocessedData, ReplaySliceParams))
: (ReplaySliceParams, TrainingData, Array[(Query, Actual)]) = {
val (data, dp) = input
val userList: Array[User] = data.userList
val itemList: Array[Item] = data.itemList
val date2u2iList: Map[LocalDate, Array[U2I]] = data.date2u2iList
val ui2UserTd = data.ui2UserTd
val ui2uid = data.ui2uid
val uid2ui = data.uid2ui
val ii2ItemTd = data.ii2ItemTd
val ii2iid = data.ii2iid
val iid2ii = data.iid2ii
val date2Actions = data.date2ActionTds
val dailyServedItems = data.dailyServedItems
val trainingUntilDate: LocalDate = dsp.baseDate.plusDays(dp.idx)
println("TrainingUntil: " + trainingUntilDate.toString)
val trainingDate2Actions: Map[LocalDate, Array[U2IActionTD]] =
date2Actions.filterKeys(k => k.isBefore(trainingUntilDate))
val trainingActions: Array[U2IActionTD] = trainingDate2Actions
.values
.flatMap(_.toSeq)
.toArray
val trainingData = new TrainingData(
HashMap[Int, UserTD]() ++ ui2UserTd,
HashMap[Int, ItemTD]() ++ ii2ItemTd,
Array[U2IActionTD]() ++ trainingActions)
val uiActionsMap: Map[Int, Int] = trainingActions
.groupBy(_.uindex)
.mapValues(_.size)
// Seq[(Int, Int)]: (User, Order Size)
val date2OrderSizeMap: Map[LocalDate, Array[(Int, Int)]] =
trainingDate2Actions
.mapValues {
_.groupBy(_.uindex).mapValues(_.size).toArray
}
val uiAverageSizeMap: Map[Int, Double] = date2OrderSizeMap
.values
.flatMap(_.toSeq)
.groupBy(_._1)
.mapValues( l => l.map(_._2).sum.toDouble / l.size )
val uiPreviousOrdersMap: Map[Int, Int] = date2OrderSizeMap
.values
.flatMap(_.toSeq)
.groupBy(_._1)
.mapValues(_.size)
val uiVarietyMap: Map[Int, Int] = trainingActions
.groupBy(_.uindex)
.mapValues(_.map(_.iindex).distinct.size)
val queryActionList: Array[(Query, Actual)] =
Range(dp.idx, math.min(dp.idx + dsp.testingWindowSize, dsp.untilIdx))
.map { queryIdx => dsp.baseDate.plusDays(queryIdx) }
.flatMap { queryDate => {
//println(
// s"Testing: ${queryDate.toString} DOW(${queryDate.getDayOfWeek})")
val u2is = date2u2iList.getOrElse(queryDate, Array[U2I]())
val uid2Actions: Map[String, Array[U2I]] = u2is.groupBy(_.uid)
val user2iids = uid2Actions.mapValues(_.map(_.iid))
// Use first action time.
val user2LocalDT = uid2Actions
.mapValues(_.map(_.dt.toLocalDateTime).min)
val todayItems: Seq[String] = dailyServedItems(queryDate)
.take(dsp.numTopSoldItems)
.map(_._1)
user2iids.map { case (uid, iids) => {
val possibleIids = (iids ++ todayItems)
.distinct
//val sortedIids = random.shuffle(possibleIids)
// Introduce some kind of stable randomness
val sortedIids = possibleIids
.sortBy(iid => MurmurHash3.stringHash(iid))
//val sortedIids = possibleIids.sortBy(identity)
val query = new Query(uid, sortedIids)
val ui = uid2ui(uid)
val actual = new Actual(
iids = iids.toSeq,
previousActionCount = uiActionsMap.getOrElse(ui, 0),
localDate = queryDate,
localDateTime = user2LocalDT(uid),
averageOrderSize = uiAverageSizeMap.getOrElse(ui, 0),
previousOrders = uiPreviousOrdersMap.getOrElse(ui, 0),
variety = uiVarietyMap.getOrElse(ui, 0)
)
(query, actual)
}}
}}
.toArray
//println("Testing Size: " + queryActionList.size)
(dp, trainingData, queryActionList)
}
def generate(input: ReplayDataSource.PreprocessedData)
: Seq[(ReplaySliceParams, TrainingData, Array[(Query, Actual)])] = {
val paramsList = generateParams()
paramsList.map { params => {
generateOne((input, params))
}}
}
override
def read(): Seq[(ReplaySliceParams, TrainingData, Seq[(Query, Actual)])] = {
generate(preprocess(load()))
.map(e => (e._1, e._2, e._3.toSeq))
}
}
case class DateObject(val $date: Long)
case class U2I(
val action: String,
val uid: String, val iid: String, val t: DateObject) {
lazy val dt: DateTime =
new DateTime(new Instant(t.$date), DateTimeZone.forOffsetHours(-8))
override def toString(): String =
s"U2I($uid, $iid, $action, $dt)"
}
case class User(val _id: String)
case class Item(val _id: String, val starttime: DateObject,
val itypes: Array[String],
val ca_name: String) {
override def toString(): String =
s"${_id} $ca_name [" + itypes.mkString(",") + "]"
val itypesSet = Set(itypes:_*)
}