| package org.template.ecommercerecommendation |
| |
| import io.prediction.controller.P2LAlgorithm |
| import io.prediction.controller.Params |
| import io.prediction.data.storage.BiMap |
| import io.prediction.data.storage.Event |
| import io.prediction.data.store.LEventStore |
| |
| import org.apache.spark.SparkContext |
| import org.apache.spark.SparkContext._ |
| import org.apache.spark.mllib.recommendation.ALS |
| import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} |
| import org.apache.spark.rdd.RDD |
| |
| import grizzled.slf4j.Logger |
| |
| import scala.collection.mutable.PriorityQueue |
| import scala.concurrent.duration.Duration |
| import scala.concurrent.ExecutionContext.Implicits.global |
| |
| case class ECommAlgorithmParams( |
| appName: String, |
| unseenOnly: Boolean, |
| seenEvents: List[String], |
| rank: Int, |
| numIterations: Int, |
| lambda: Double, |
| seed: Option[Long] |
| ) extends Params |
| |
| |
| case class ProductModel( |
| item: Item, |
| features: Option[Array[Double]], // features by ALS |
| count: Int // popular count |
| ) |
| |
| class ECommModel( |
| val rank: Int, |
| val userFeatures: Map[Int, Array[Double]], |
| val productModels: Map[Int, ProductModel], |
| val userStringIntMap: BiMap[String, Int], |
| val itemStringIntMap: BiMap[String, Int] |
| ) extends Serializable { |
| |
| @transient lazy val itemIntStringMap = itemStringIntMap.inverse |
| |
| override def toString = { |
| s" rank: ${rank}" + |
| s" userFeatures: [${userFeatures.size}]" + |
| s"(${userFeatures.take(2).toList}...)" + |
| s" productModels: [${productModels.size}]" + |
| s"(${productModels.take(2).toList}...)" + |
| s" userStringIntMap: [${userStringIntMap.size}]" + |
| s"(${userStringIntMap.take(2).toString}...)]" + |
| s" itemStringIntMap: [${itemStringIntMap.size}]" + |
| s"(${itemStringIntMap.take(2).toString}...)]" |
| } |
| } |
| |
| class ECommAlgorithm(val ap: ECommAlgorithmParams) |
| extends P2LAlgorithm[PreparedData, ECommModel, Query, PredictedResult] { |
| |
| @transient lazy val logger = Logger[this.type] |
| |
| def train(sc: SparkContext, data: PreparedData): ECommModel = { |
| require(!data.viewEvents.take(1).isEmpty, |
| s"viewEvents in PreparedData cannot be empty." + |
| " Please check if DataSource generates TrainingData" + |
| " and Preprator generates PreparedData correctly.") |
| require(!data.users.take(1).isEmpty, |
| s"users in PreparedData cannot be empty." + |
| " Please check if DataSource generates TrainingData" + |
| " and Preprator generates PreparedData correctly.") |
| require(!data.items.take(1).isEmpty, |
| s"items in PreparedData cannot be empty." + |
| " Please check if DataSource generates TrainingData" + |
| " and Preprator generates PreparedData correctly.") |
| // create User and item's String ID to integer index BiMap |
| val userStringIntMap = BiMap.stringInt(data.users.keys) |
| val itemStringIntMap = BiMap.stringInt(data.items.keys) |
| |
| val mllibRatings: RDD[MLlibRating] = genMLlibRating( |
| userStringIntMap = userStringIntMap, |
| itemStringIntMap = itemStringIntMap, |
| data = data |
| ) |
| |
| // MLLib ALS cannot handle empty training data. |
| require(!mllibRatings.take(1).isEmpty, |
| s"mllibRatings cannot be empty." + |
| " Please check if your events contain valid user and item ID.") |
| |
| // seed for MLlib ALS |
| val seed = ap.seed.getOrElse(System.nanoTime) |
| |
| // use ALS to train feature vectors |
| val m = ALS.trainImplicit( |
| ratings = mllibRatings, |
| rank = ap.rank, |
| iterations = ap.numIterations, |
| lambda = ap.lambda, |
| blocks = -1, |
| alpha = 1.0, |
| seed = seed) |
| |
| val userFeatures = m.userFeatures.collectAsMap.toMap |
| |
| // convert ID to Int index |
| val items = data.items.map { case (id, item) => |
| (itemStringIntMap(id), item) |
| } |
| |
| // join item with the trained productFeatures |
| val productFeatures: Map[Int, (Item, Option[Array[Double]])] = |
| items.leftOuterJoin(m.productFeatures).collectAsMap.toMap |
| |
| val popularCount = trainDefault( |
| userStringIntMap = userStringIntMap, |
| itemStringIntMap = itemStringIntMap, |
| data = data |
| ) |
| |
| val productModels: Map[Int, ProductModel] = productFeatures |
| .map { case (index, (item, features)) => |
| val pm = ProductModel( |
| item = item, |
| features = features, |
| // NOTE: use getOrElse because popularCount may not contain all items. |
| count = popularCount.getOrElse(index, 0) |
| ) |
| (index, pm) |
| } |
| |
| new ECommModel( |
| rank = m.rank, |
| userFeatures = userFeatures, |
| productModels = productModels, |
| userStringIntMap = userStringIntMap, |
| itemStringIntMap = itemStringIntMap |
| ) |
| } |
| |
| // generate MLlibRating from PreparedData. |
| // You may customize here if use different events or different aggregation method |
| def genMLlibRating( |
| userStringIntMap: BiMap[String, Int], |
| itemStringIntMap: BiMap[String, Int], |
| data: PreparedData): RDD[MLlibRating] = { |
| |
| val mllibRatings = data.viewEvents |
| .map { r => |
| // Convert user and item String IDs to Int index for MLlib |
| val uindex = userStringIntMap.getOrElse(r.user, -1) |
| val iindex = itemStringIntMap.getOrElse(r.item, -1) |
| |
| if (uindex == -1) |
| logger.info(s"Couldn't convert nonexistent user ID ${r.user}" |
| + " to Int index.") |
| |
| if (iindex == -1) |
| logger.info(s"Couldn't convert nonexistent item ID ${r.item}" |
| + " to Int index.") |
| |
| ((uindex, iindex), 1) |
| } |
| .filter { case ((u, i), v) => |
| // keep events with valid user and item index |
| (u != -1) && (i != -1) |
| } |
| .reduceByKey(_ + _) // aggregate all view events of same user-item pair |
| .map { case ((u, i), v) => |
| // MLlibRating requires integer index for user and item |
| MLlibRating(u, i, v) |
| } |
| .cache() |
| |
| mllibRatings |
| } |
| |
| // train default model. |
| // you may customize here if use different events or |
| // need different ways to count "popular" score |
| def trainDefault( |
| userStringIntMap: BiMap[String, Int], |
| itemStringIntMap: BiMap[String, Int], |
| data: PreparedData): Map[Int, Int] = { |
| // count number of buys |
| // (item index, count) |
| val buyCountsRDD: RDD[(Int, Int)] = data.buyEvents |
| .map { r => |
| // Convert user and item String IDs to Int index |
| val uindex = userStringIntMap.getOrElse(r.user, -1) |
| val iindex = itemStringIntMap.getOrElse(r.item, -1) |
| |
| if (uindex == -1) |
| logger.info(s"Couldn't convert nonexistent user ID ${r.user}" |
| + " to Int index.") |
| |
| if (iindex == -1) |
| logger.info(s"Couldn't convert nonexistent item ID ${r.item}" |
| + " to Int index.") |
| |
| (uindex, iindex, 1) |
| } |
| .filter { case (u, i, v) => |
| // keep events with valid user and item index |
| (u != -1) && (i != -1) |
| } |
| .map { case (u, i, v) => (i, 1) } // key is item |
| .reduceByKey{ case (a, b) => a + b } // count number of items occurrence |
| |
| buyCountsRDD.collectAsMap.toMap |
| } |
| |
| def predict(model: ECommModel, query: Query): PredictedResult = { |
| |
| val userFeatures = model.userFeatures |
| val productModels = model.productModels |
| |
| // convert whiteList's string ID to integer index |
| val whiteList: Option[Set[Int]] = query.whiteList.map( set => |
| set.flatMap(model.itemStringIntMap.get(_)) |
| ) |
| |
| val finalBlackList: Set[Int] = genBlackList(query = query) |
| // convert seen Items list from String ID to interger Index |
| .flatMap(x => model.itemStringIntMap.get(x)) |
| |
| val userFeature: Option[Array[Double]] = |
| model.userStringIntMap.get(query.user).flatMap { userIndex => |
| userFeatures.get(userIndex) |
| } |
| |
| val topScores: Array[(Int, Double)] = if (userFeature.isDefined) { |
| // the user has feature vector |
| predictKnownUser( |
| userFeature = userFeature.get, |
| productModels = productModels, |
| query = query, |
| whiteList = whiteList, |
| blackList = finalBlackList |
| ) |
| } else { |
| // the user doesn't have feature vector. |
| // For example, new user is created after model is trained. |
| logger.info(s"No userFeature found for user ${query.user}.") |
| |
| // check if the user has recent events on some items |
| val recentItems: Set[String] = getRecentItems(query) |
| val recentList: Set[Int] = recentItems.flatMap (x => |
| model.itemStringIntMap.get(x)) |
| |
| val recentFeatures: Vector[Array[Double]] = recentList.toVector |
| // productModels may not contain the requested item |
| .map { i => |
| productModels.get(i).flatMap { pm => pm.features } |
| }.flatten |
| |
| if (recentFeatures.isEmpty) { |
| logger.info(s"No features vector for recent items ${recentItems}.") |
| predictDefault( |
| productModels = productModels, |
| query = query, |
| whiteList = whiteList, |
| blackList = finalBlackList |
| ) |
| } else { |
| predictSimilar( |
| recentFeatures = recentFeatures, |
| productModels = productModels, |
| query = query, |
| whiteList = whiteList, |
| blackList = finalBlackList |
| ) |
| } |
| } |
| |
| val itemScores = topScores.map { case (i, s) => |
| new ItemScore( |
| // convert item int index back to string ID |
| item = model.itemIntStringMap(i), |
| score = s |
| ) |
| } |
| |
| new PredictedResult(itemScores) |
| } |
| |
| /** generate final blackList based on other constraints **/ |
| def genBlackList(query: Query): Set[String] = { |
| // if unseenOnly is True, get all seen items |
| val seenItems: Set[String] = if (ap.unseenOnly) { |
| |
| // get all user item events which are considered as "seen" events |
| val seenEvents: Iterator[Event] = try { |
| LEventStore.findByEntity( |
| appName = ap.appName, |
| entityType = "user", |
| entityId = query.user, |
| eventNames = Some(ap.seenEvents), |
| targetEntityType = Some(Some("item")), |
| // set time limit to avoid super long DB access |
| timeout = Duration(200, "millis") |
| ) |
| } catch { |
| case e: scala.concurrent.TimeoutException => |
| logger.error(s"Timeout when read seen events." + |
| s" Empty list is used. ${e}") |
| Iterator[Event]() |
| case e: Exception => |
| logger.error(s"Error when read seen events: ${e}") |
| throw e |
| } |
| |
| seenEvents.map { event => |
| try { |
| event.targetEntityId.get |
| } catch { |
| case e => { |
| logger.error(s"Can't get targetEntityId of event ${event}.") |
| throw e |
| } |
| } |
| }.toSet |
| } else { |
| Set[String]() |
| } |
| |
| // get the latest constraint unavailableItems $set event |
| val unavailableItems: Set[String] = try { |
| val constr = LEventStore.findByEntity( |
| appName = ap.appName, |
| entityType = "constraint", |
| entityId = "unavailableItems", |
| eventNames = Some(Seq("$set")), |
| limit = Some(1), |
| latest = true, |
| timeout = Duration(200, "millis") |
| ) |
| if (constr.hasNext) { |
| constr.next.properties.get[Set[String]]("items") |
| } else { |
| Set[String]() |
| } |
| } catch { |
| case e: scala.concurrent.TimeoutException => |
| logger.error(s"Timeout when read set unavailableItems event." + |
| s" Empty list is used. ${e}") |
| Set[String]() |
| case e: Exception => |
| logger.error(s"Error when read set unavailableItems event: ${e}") |
| throw e |
| } |
| |
| // combine query's blackList,seenItems and unavailableItems |
| // into final blackList. |
| query.blackList.getOrElse(Set[String]()) ++ seenItems ++ unavailableItems |
| } |
| |
| /** getRecentEvents **/ |
| def getRecentItems(query: Query): Set[String] = { |
| // get latest 10 user view item events |
| val recentEvents = try { |
| LEventStore.findByEntity( |
| appName = ap.appName, |
| // entityType and entityId is specified for fast lookup |
| entityType = "user", |
| entityId = query.user, |
| eventNames = Some(Seq("view")), |
| targetEntityType = Some(Some("item")), |
| limit = Some(10), |
| latest = true, |
| // set time limit to avoid super long DB access |
| timeout = Duration(200, "millis") |
| ) |
| } catch { |
| case e: scala.concurrent.TimeoutException => |
| logger.error(s"Timeout when read recent events." + |
| s" Empty list is used. ${e}") |
| Iterator[Event]() |
| case e: Exception => |
| logger.error(s"Error when read recent events: ${e}") |
| throw e |
| } |
| |
| val recentItems: Set[String] = recentEvents.map { event => |
| try { |
| event.targetEntityId.get |
| } catch { |
| case e => { |
| logger.error("Can't get targetEntityId of event ${event}.") |
| throw e |
| } |
| } |
| }.toSet |
| |
| recentItems |
| } |
| |
| /** prediction for user with known feature vector **/ |
| def predictKnownUser( |
| userFeature: Array[Double], |
| productModels: Map[Int, ProductModel], |
| query: Query, |
| whiteList: Option[Set[Int]], |
| blackList: Set[Int] |
| ): Array[(Int, Double)] = { |
| val indexScores: Map[Int, Double] = productModels.par // convert to parallel collection |
| .filter { case (i, pm) => |
| pm.features.isDefined && |
| isCandidateItem( |
| i = i, |
| item = pm.item, |
| categories = query.categories, |
| whiteList = whiteList, |
| blackList = blackList |
| ) |
| } |
| .map { case (i, pm) => |
| // NOTE: features must be defined, so can call .get |
| val s = dotProduct(userFeature, pm.features.get) |
| // Can adjust score here |
| (i, s) |
| } |
| .filter(_._2 > 0) // only keep items with score > 0 |
| .seq // convert back to sequential collection |
| |
| val ord = Ordering.by[(Int, Double), Double](_._2).reverse |
| val topScores = getTopN(indexScores, query.num)(ord).toArray |
| |
| topScores |
| } |
| |
| /** default prediction when know nothing about the user */ |
| def predictDefault( |
| productModels: Map[Int, ProductModel], |
| query: Query, |
| whiteList: Option[Set[Int]], |
| blackList: Set[Int] |
| ): Array[(Int, Double)] = { |
| val indexScores: Map[Int, Double] = productModels.par // convert back to sequential collection |
| .filter { case (i, pm) => |
| isCandidateItem( |
| i = i, |
| item = pm.item, |
| categories = query.categories, |
| whiteList = whiteList, |
| blackList = blackList |
| ) |
| } |
| .map { case (i, pm) => |
| // Can adjust score here |
| (i, pm.count.toDouble) |
| } |
| .seq |
| |
| val ord = Ordering.by[(Int, Double), Double](_._2).reverse |
| val topScores = getTopN(indexScores, query.num)(ord).toArray |
| |
| topScores |
| } |
| |
| /** return top similar items of rcently interacted items */ |
| def predictSimilar( |
| recentFeatures: Vector[Array[Double]], |
| productModels: Map[Int, ProductModel], |
| query: Query, |
| whiteList: Option[Set[Int]], |
| blackList: Set[Int] |
| ): Array[(Int, Double)] = { |
| val indexScores: Map[Int, Double] = productModels.par // convert to parallel collection |
| .filter { case (i, pm) => |
| pm.features.isDefined && |
| isCandidateItem( |
| i = i, |
| item = pm.item, |
| categories = query.categories, |
| whiteList = whiteList, |
| blackList = blackList |
| ) |
| } |
| .map { case (i, pm) => |
| val s = recentFeatures.map{ rf => |
| // pm.features must be defined because of filter logic above |
| cosine(rf, pm.features.get) |
| }.reduce(_ + _) |
| // Can adjust score here |
| (i, s) |
| } |
| .filter(_._2 > 0) // keep items with score > 0 |
| .seq // convert back to sequential collection |
| |
| val ord = Ordering.by[(Int, Double), Double](_._2).reverse |
| val topScores = getTopN(indexScores, query.num)(ord).toArray |
| |
| topScores |
| } |
| |
| private |
| def getTopN[T](s: Iterable[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = { |
| |
| val q = PriorityQueue() |
| |
| for (x <- s) { |
| if (q.size < n) |
| q.enqueue(x) |
| else { |
| // q is full |
| if (ord.compare(x, q.head) < 0) { |
| q.dequeue() |
| q.enqueue(x) |
| } |
| } |
| } |
| |
| q.dequeueAll.toSeq.reverse |
| } |
| |
| private |
| def dotProduct(v1: Array[Double], v2: Array[Double]): Double = { |
| val size = v1.size |
| var i = 0 |
| var d: Double = 0 |
| while (i < size) { |
| d += v1(i) * v2(i) |
| i += 1 |
| } |
| d |
| } |
| |
| private |
| def cosine(v1: Array[Double], v2: Array[Double]): Double = { |
| val size = v1.size |
| var i = 0 |
| var n1: Double = 0 |
| var n2: Double = 0 |
| var d: Double = 0 |
| while (i < size) { |
| n1 += v1(i) * v1(i) |
| n2 += v2(i) * v2(i) |
| d += v1(i) * v2(i) |
| i += 1 |
| } |
| val n1n2 = (math.sqrt(n1) * math.sqrt(n2)) |
| if (n1n2 == 0) 0 else (d / n1n2) |
| } |
| |
| private |
| def isCandidateItem( |
| i: Int, |
| item: Item, |
| categories: Option[Set[String]], |
| whiteList: Option[Set[Int]], |
| blackList: Set[Int] |
| ): Boolean = { |
| // can add other custom filtering here |
| whiteList.map(_.contains(i)).getOrElse(true) && |
| !blackList.contains(i) && |
| // filter categories |
| categories.map { cat => |
| item.categories.map { itemCat => |
| // keep this item if has ovelap categories with the query |
| !(itemCat.toSet.intersect(cat).isEmpty) |
| }.getOrElse(false) // discard this item if it has no categories |
| }.getOrElse(true) |
| |
| } |
| |
| } |