| --- |
| title: DASE Components Explained (Complementary Purchase) |
| --- |
| |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| |
| <%= partial 'shared/dase/dase', locals: { template_name: 'Complementary Purchase Engine Template' } %> |
| |
| ## The Engine Design |
| |
| As you can see from the Quick Start, *MyComplementaryPurchase* takes a JSON prediction |
| query, e.g. `{ "items" : ["s2i1"], "num" : 3 }`, and return a JSON predicted result. |
| In MyComplementaryPurchase/src/main/scala/***Engine.scala***, the `Query` case class |
| defines the format of such **query**: |
| |
| ```scala |
| case class Query(items: Set[String], num: Int) |
| extends Serializable |
| ``` |
| |
| The `PredictedResult` case class defines the format of **predicted result**, |
| such as |
| |
| ```json |
| { |
| "rules":[ |
| { |
| "cond":["s2i1"], |
| "itemScores":[ |
| { |
| "item":"s2i2", |
| "support":0.2, |
| "confidence":0.9090909090909091, |
| "lift":3.787878787878788 |
| }, |
| { |
| "item":"s2i3", |
| "support":0.14, |
| "confidence":0.6363636363636364, |
| "lift":3.535353535353535 |
| } |
| ] |
| } |
| ] |
| } |
| ``` |
| |
| with: |
| |
| ```scala |
| case class PredictedResult(rules: Array[Rule]) |
| extends Serializable |
| |
| case class Rule(cond: Set[String], itemScores: Array[ItemScore]) |
| extends Serializable |
| |
| case class ItemScore( |
| item: String, support: Double, confidence: Double, lift: Double |
| ) extends Serializable |
| ``` |
| |
| Finally, `ComplementaryPurchaseEngine` is the *Engine Factory* that defines the |
| components this engine will use: Data Source, Data Preparator, Algorithm(s) and |
| Serving components. |
| |
| ```scala |
| object ComplementaryPurchaseEngine extends IEngineFactory { |
| def apply() = { |
| new Engine( |
| classOf[DataSource], |
| classOf[Preparator], |
| Map("algo" -> classOf[Algorithm]), |
| classOf[Serving]) |
| } |
| } |
| ``` |
| |
| Each DASE component of the `ComplementaryPurchaseEngine` will be explained below. |
| |
| ## Data |
| |
| In the DASE architecture, data is prepared by 2 components sequentially: *DataSource* and *DataPreparator*. They take data |
| from the data store and prepare them for Algorithm. |
| |
| ### Data Source |
| |
| In MyComplementaryPurchase/src/main/scala/***DataSource.scala***, the `readTraining` |
| method of class `DataSource` reads and selects data from the *Event Store* |
| (data store of the *Event Server*). It returns `TrainingData`. |
| |
| ```scala |
| case class DataSourceParams(appName: String) extends Params |
| |
| class DataSource(val dsp: DataSourceParams) |
| extends PDataSource[TrainingData, |
| EmptyEvaluationInfo, Query, EmptyActualResult] { |
| |
| @transient lazy val logger = Logger[this.type] |
| |
| override |
| def readTraining(sc: SparkContext): TrainingData = { |
| |
| // get all "user" "buy" "item" events |
| val buyEvents: RDD[BuyEvent] = PEventStore.find( |
| appName = dsp.appName, |
| entityType = Some("user"), |
| eventNames = Some(List("buy")), |
| targetEntityType = Some(Some("item")))(sc) |
| .map { event => |
| try { |
| new BuyEvent( |
| user = event.entityId, |
| item = event.targetEntityId.get, |
| t = event.eventTime.getMillis |
| ) |
| } catch { |
| case e: Exception => { |
| logger.error(s"Cannot convert ${event} to BuyEvent. ${e}") |
| throw e |
| } |
| } |
| }.cache() |
| |
| new TrainingData(buyEvents) |
| } |
| } |
| ``` |
| |
| PredictionIO automatically loads the parameters of *datasource* specified in MyComplementaryPurchase/***engine.json***, including *appName*, to `dsp`. |
| |
| In ***engine.json***: |
| |
| ``` |
| { |
| ... |
| "datasource": { |
| "params" : { |
| "appName": "MyApp1" |
| } |
| }, |
| ... |
| } |
| ``` |
| |
| In `readTraining()`, `PEventStore` is an object which provides function to access data that is collected by PredictionIO Event Server. |
| |
| This Complementary Purchase Engine Template requires "buy" events. |
| |
| `PEventStore.find(...)` specifies the events that you want to read. In this case, "user buy item" events are read and then each is mapped to a `BuyEvent` object. |
| |
| `BuyEvent` case class is defined as: |
| |
| ```scala |
| case class BuyEvent(user: String, item: String, t: Long) |
| ``` |
| |
| `TrainingData` contains an RDD of `BuyEvent` objects. The class definition of `TrainingData` is: |
| |
| ```scala |
| class TrainingData( |
| val buyEvents: RDD[BuyEvent] |
| ) extends Serializable { ... } |
| ``` |
| |
| PredictionIO then passes the returned `TrainingData` object to *Data Preparator*. |
| |
| NOTE: You could modify the DataSource to read other event other than the default **buy**. |
| |
| ### Data Preparator |
| |
| In MyComplementaryPurchase/src/main/scala/***Preparator.scala***, the `prepare` method |
| of class `Preparator` takes `TrainingData` as its input and performs any |
| necessary feature selection and data processing tasks. At the end, it returns |
| `PreparedData` which should contain the data *Algorithm* needs. |
| |
| By default, `prepare` simply copies the unprocessed `TrainingData` data to `PreparedData`: |
| |
| ```scala |
| class Preparator |
| extends PPreparator[TrainingData, PreparedData] { |
| |
| @transient lazy val logger = Logger[this.type] |
| |
| def prepare(sc: SparkContext, td: TrainingData): PreparedData = { |
| new PreparedData(buyEvents = td.buyEvents) |
| } |
| } |
| |
| class PreparedData( |
| val buyEvents: RDD[BuyEvent] |
| ) extends Serializable |
| ``` |
| |
| PredictionIO passes the returned `PreparedData` object to Algorithm's `train` function. |
| |
| ## Algorithm |
| |
| In MyComplementaryPurchase/src/main/scala/***ALSAlgorithm.scala***, the two methods of |
| the algorithm class are `train` and `predict`. `train` is responsible for |
| training the predictive model; `predict` is |
| responsible for using this model to make prediction. |
| |
| |
| The default algorithm is based on concept of [Association Rule Learning] (http://en.wikipedia.org/wiki/Association_rule_learning) to find interesting association rules (A implies B) that indicates additional item (B) may be bought together given a list of items (A). A is the *condition* and B is the *consequence*. |
| |
| ### Algorithm parameters |
| |
| The Algorithm takes the following parameters, as defined by the `AlgorithmParams` case class: |
| |
| ```scala |
| case class AlgorithmParams( |
| basketWindow: Int, // in seconds |
| maxRuleLength: Int, |
| minSupport: Double, |
| minConfidence: Double, |
| minLift: Double, |
| minBasketSize: Int, |
| maxNumRulesPerCond: Int // max number of rules per condition |
| ) extends Params |
| |
| ``` |
| |
| Parameter description: |
| |
| - **basketWindow**: The buy event is considered as the same basket as previous one if the time difference is within this window (in unit of seconds). For example, if it's set to 120, it means that if the user buys item B within 2 minutes of previous purchase (item A), then the item set [A, B] is considered as the same basket. The purchase of this *basket* is referred as one *transaction*. |
| - **maxRuleLength**: The maximum length of the association rule length. Must be at least 2. For example, rule of "A implies B" has length of 2 while rule "A, B implies C" has a length of 3. Increasing this number will increase the training time significantly because more combinations are considered. |
| - **minSupport**: The minimum required *support* for the item set to be considered as rule (valid range is 0 to 1). It's the percentage of the item set appearing among all transactions. This is used to filter out infrequent item set. For example, setting to 0.1 means that the item set must appear in 10 % of all transactions. |
| - **minConfidence**: The minimum *confidence* required for the rules (valid range is 0 to 1). The confidence indicates the probability of the condition and conseuquence appear in the same transaction. For example, if A appears in 30 transactions and the item set [A, B] appears in 20 transactions, then the rule "A implies B" has confidence of 0.66. |
| - **minLift**: The minimum *lift* required for the rule. It should be set to 1 to find high quality rule. It's the confidence of the rule divided by the support of the consequence. It is used to filter out rules that the consequence is very frequent anyway regardless of the condition. |
| - **minBasketSize**: The minimum number of items in basket to be considered by algorithm. This value must be at least 2. |
| - **maxNumRulesPerCond**: Maximum number of rules generated per condition and stored in the model. By default, the top rules are sorted by *lift* score. |
| |
| INFO: If you import your own data and the engine doesn't return any results, it could be caused by the following reasons: (1) the algorithm parameter constraint is too high and the algo couldn't find rules that satisfy the condition. you could try setting the following param to 0: **minSupport**, **minConfidence**, **minLift** and then see if anything returned (regardless of recommendation quality), and then adjust the parameter accordingly. (2) the complementary purchase engine requires buy event with correct eventTime. If you import data without specifying eventTime, the SDK will use current time because it assumes the event happens in real time (which is not the case if you import as batch offline), resulting in that all buy events are treated as one big transaction while they should be treated as multiple transactions. |
| |
| |
| The values of these parameters can be specified in *algorithms* of |
| MyComplementaryPurchase/***engine.json***: |
| |
| ``` |
| { |
| ... |
| "algorithms": [ |
| { |
| "name": "algo", |
| "params": { |
| "basketWindow" : 120, |
| "maxRuleLength" : 2, |
| "minSupport": 0.1, |
| "minConfidence": 0.6, |
| "minLift" : 1.0, |
| "minBasketSize" : 2, |
| "maxNumRulesPerCond": 5 |
| } |
| } |
| ] |
| ... |
| } |
| ``` |
| |
| PredictionIO will automatically loads these values into the constructor of the `Algorithm` class. |
| |
| ```scala |
| class Algorithm(val ap: AlgorithmParams) |
| extends P2LAlgorithm[PreparedData, Model, Query, PredictedResult] { |
| ... |
| } |
| ``` |
| |
| ### train(...) |
| |
| `train` is called when you run **pio train** to train a predictive model. The algorithm first find all basket transactions, generates and filters the association rules based on the algorithm parameters: |
| |
| ```scala |
| |
| def train(sc: SparkContext, pd: PreparedData): Model = { |
| val windowMillis = ap.basketWindow * 1000 |
| |
| ... |
| |
| val transactions: RDD[Set[String]] = ... |
| |
| val totalTransaction = transactions.count() |
| val minSupportCount = ap.minSupport * totalTransaction |
| |
| ... |
| |
| // generate item sets |
| val itemSets: RDD[Set[String]] = transactions |
| .flatMap { tran => |
| (1 to ap.maxRuleLength).flatMap(n => tran.subsets(n)) |
| } |
| |
| ... |
| |
| val itemSetCount: RDD[(Set[String], Int)] = ... |
| |
| ... |
| |
| val rules: RDD[(Set[String], RuleScore)] = ... |
| |
| val sortedRules = rules.groupByKey |
| .mapValues(iter => |
| iter.toVector |
| .sortBy(_.lift)(Ordering.Double.reverse) |
| .take(ap.maxNumRulesPerCond) |
| ) |
| .collectAsMap.toMap |
| |
| new Model(sortedRules) |
| } |
| |
| ``` |
| |
| PredictionIO will automatically store the returned model after training, i.e. the `Model` object. |
| |
| The `Model` stores the top rules for each condition: |
| |
| ```scala |
| class Model( |
| val rules: Map[Set[String], Vector[RuleScore]] |
| ) extends Serializable { |
| ... |
| } |
| ``` |
| |
| ### predict(...) |
| |
| `predict` is called when you send a JSON query to |
| http://localhost:8000/queries.json. PredictionIO converts the query, such as `{ "items" : ["s2i1"], "num" : 3 }` to the `Query` class you defined previously in `Engine.scala`. |
| |
| The `predict()` function does the following: |
| |
| 1. find all possible subset of the items in query |
| 2. use the subsets as condition to look up the model and return the rules for each condition. |
| |
| ```scala |
| |
| ... |
| |
| def predict(model: Model, query: Query): PredictedResult = { |
| val conds = (1 to maxCondLength).flatMap(n => query.items.subsets(n)) |
| |
| val rules = conds.map { cond => |
| model.rules.get(cond).map{ vec => |
| val itemScores = vec.take(query.num).map { rs => |
| new ItemScore( |
| item = rs.conseq, |
| support = rs.support, |
| confidence = rs.confidence, |
| lift = rs.lift |
| ) |
| }.toArray |
| Rule(cond = cond, itemScores = itemScores) |
| } |
| }.flatten.toArray |
| |
| new PredictedResult(rules) |
| } |
| |
| ... |
| |
| ``` |
| |
| PredictionIO passes the returned `PredictedResult` object to *Serving*. |
| |
| ## Serving |
| |
| The `serve` method of class `Serving` processes predicted result. It is also |
| responsible for combining multiple predicted results into one if you have more |
| than one predictive model. *Serving* then returns the final predicted result. |
| PredictionIO will convert it to a JSON response automatically. |
| |
| In MyComplementaryPurchase/src/main/scala/***Serving.scala***, |
| |
| ```scala |
| class Serving |
| extends LServing[Query, PredictedResult] { |
| |
| @transient lazy val logger = Logger[this.type] |
| |
| override |
| def serve(query: Query, |
| predictedResults: Seq[PredictedResult]): PredictedResult = { |
| predictedResults.head |
| } |
| } |
| ``` |
| |
| When you send a JSON query to http://localhost:8000/queries.json, |
| `PredictedResult` from all models will be passed to `serve` as a sequence, i.e. |
| `Seq[PredictedResult]`. |
| |
| NOTE: An engine can train multiple models if you specify more than one Algorithm |
| component in `object ComplementaryPurchaseEngine` inside ***Engine.scala*** and the corresponding parameters in ***engine.json***. Since only one `Algorithm` is implemented by default, this `Seq` contains one element. |