| --- |
| title: DASE Components Explained (Lead Scoring) |
| --- |
| |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| |
| <%= partial 'shared/dase/dase', locals: { template_name: 'Lead Scoring Engine Template' } %> |
| |
| ## The Engine Design |
| |
| As you can see from the Quick Start, *MyLeadScoring* takes a JSON prediction |
| query, e.g. '{ "landingPageId" : "example.com/page9", "referrerId" : "referrer10.com", "browser": "Firefox" }' , and return a JSON predicted result. |
| In MyLeadScoring/src/main/scala/***Engine.scala***, the `Query` case class |
| defines the format of such **query**: |
| |
| ```scala |
| case class Query( |
| landingPageId: String, |
| referrerId: String, |
| browser: String |
| ) extends Serializable |
| ``` |
| |
| The `PredictedResult` case class defines the format of **predicted result**, |
| such as |
| |
| ```json |
| {"score":0.7466666666666667} |
| ``` |
| |
| with: |
| |
| ```scala |
| case class PredictedResult( |
| score: Double |
| ) extends Serializable |
| ``` |
| |
| Finally, `LeadScoringEngine` is the *Engine Factory* that defines the |
| components this engine will use: Data Source, Data Preparator, Algorithm(s) and |
| Serving components. |
| |
| ```scala |
| object LeadScoringEngine extends IEngineFactory { |
| def apply() = { |
| new Engine( |
| classOf[DataSource], |
| classOf[Preparator], |
| Map("randomforest" -> classOf[RFAlgorithm]), |
| classOf[Serving]) |
| } |
| } |
| ``` |
| |
| Each DASE component of the `LeadScoringEngine` will be explained below. |
| |
| By default, Spark's MLlib [RandomForest algorithm](https://spark.apache.org/docs/latest/mllib-ensembles.html#random-forests) is used. |
| |
| ## Data |
| |
| In the DASE architecture, data is prepared by 2 components sequentially: *DataSource* and *DataPreparator*. They take data |
| from the data store and prepare them for Algorithm. |
| |
| ### Data Source |
| |
| In MyLeadScoring/src/main/scala/***DataSource.scala***, the `readTraining` |
| method of class `DataSource` reads and selects data from the *Event Store* |
| (data store of the *Event Server*). It returns `TrainingData`. |
| |
| ```scala |
| class DataSource(val dsp: DataSourceParams) |
| extends PDataSource[TrainingData, |
| EmptyEvaluationInfo, Query, EmptyActualResult] { |
| |
| @transient lazy val logger = Logger[this.type] |
| |
| override |
| def readTraining(sc: SparkContext): TrainingData = { |
| |
| val viewPage: RDD[(String, Event)] = PEventStore.find( |
| appName = dsp.appName, |
| entityType = Some("user"), |
| eventNames = Some(Seq("view")), |
| // targetEntityType is optional field of an event. |
| targetEntityType = Some(Some("page")))(sc) |
| // PEventStore.find() returns RDD[Event] |
| .map { event => |
| val sessionId = try { |
| event.properties.get[String]("sessionId") |
| } catch { |
| case e: Exception => { |
| logger.error(s"Cannot get sessionId from event ${event}. ${e}.") |
| throw e |
| } |
| } |
| (sessionId, event) |
| } |
| |
| val buyItem: RDD[(String, Event)] = PEventStore.find( |
| appName = dsp.appName, |
| entityType = Some("user"), |
| eventNames = Some(Seq("buy")), |
| // targetEntityType is optional field of an event. |
| targetEntityType = Some(Some("item")))(sc) |
| // PEventStore.find() returns RDD[Event] |
| .map { event => |
| val sessionId = try { |
| event.properties.get[String]("sessionId") |
| } catch { |
| case e: Exception => { |
| logger.error(s"Cannot get sessionId from event ${event}. ${e}.") |
| throw e |
| } |
| } |
| (sessionId, event) |
| } |
| |
| val session: RDD[Session] = viewPage.cogroup(buyItem) |
| .map { case (sessionId, (viewIter, buyIter)) => |
| // the first view event of the session is the landing event |
| val landing = viewIter.reduce{ (a, b) => |
| if (a.eventTime.isBefore(b.eventTime)) a else b |
| } |
| // any buy after landing |
| val buy = buyIter.filter( b => b.eventTime.isAfter(landing.eventTime)) |
| .nonEmpty |
| |
| try { |
| new Session( |
| landingPageId = landing.targetEntityId.get, |
| referrerId = landing.properties.getOrElse[String]("referrerId", ""), |
| browser = landing.properties.getOrElse[String]("browser", ""), |
| buy = buy |
| ) |
| } catch { |
| case e: Exception => { |
| logger.error(s"Cannot create session data from ${landing}. ${e}.") |
| throw e |
| } |
| } |
| }.cache() |
| |
| new TrainingData(session) |
| } |
| } |
| ``` |
| |
| PredictionIO automatically loads the parameters of *datasource* specified in MyLeadScoring/***engine.json***, including *appName*, to `dsp`. |
| |
| In ***engine.json***: |
| |
| ``` |
| { |
| ... |
| "datasource": { |
| "params" : { |
| "appName": "MyApp1" |
| } |
| }, |
| ... |
| } |
| ``` |
| |
| In `readTraining()`, `PEventStore` is an object which provides function to access data that is collected by PredictionIO Event Server. |
| |
| This Lead Scoring Engine Template requires "view" and "buy" events with `sessionId` in event property. |
| |
| `PEventStore.find(...)` specifies the events that you want to read. In this case, "user view page" and "user buy item" events are read and then each is mapped to tuple of (sessionId, event). The event are then "cogrouped" by sessionId to find out the information in the session, such as first page view (landing page view), and whether the user converts (buy event), to create a RDD of Session as TrainingData: |
| |
| ```scala |
| case class Session( |
| landingPageId: String, |
| referrerId: String, |
| browser: String, |
| buy: Boolean // buy or not |
| ) extends Serializable |
| |
| class TrainingData( |
| val session: RDD[Session] |
| ) extends Serializable |
| |
| ``` |
| |
| PredictionIO then passes the returned `TrainingData` object to *Data Preparator*. |
| |
| NOTE: You could modify the DataSource to read other event other than the default **buy** if the definition of conversion is not "buy item" event. |
| |
| ### Data Preparator |
| |
| In MyLeadScoring/src/main/scala/***Preparator.scala***, the `prepare` method |
| of class `Preparator` takes `TrainingData` as its input and performs any |
| necessary feature selection and data processing tasks. At the end, it returns |
| `PreparedData` which should contain the data *Algorithm* needs. |
| |
| In this template, `prepare` will select the features from the Session object and convert them to the data required by the MLlib's RandomForest algorithm. |
| |
| The `PreparedData` is defined as: |
| |
| ```scala |
| class PreparedData( |
| val labeledPoints: RDD[LabeledPoint], |
| val featureIndex: Map[String, Int], |
| val featureCategoricalIntMap: Map[String, Map[String, Int]] |
| ) extends Serializable |
| ``` |
| |
| The `LabeledPoint` class is defined in Spark MLlib and it's required for the RandomForest Algorithm. The `featureIndex` is a Map of feature name to the position index in the feature vector. `featureCategoricalIntMap` is a Map of categorical feature name to the Map of categorical value map for this feature. |
| |
| By default, the feature used for classification is "landingPage", "referrer" and "browser". Since these features contain categorical values, we need to create a map of categorical values to the integer values for the algorithm to use. |
| |
| NOTE: You can customize the template to use other features. |
| |
| For example, if the feature "landingPage" can be any of the following values: "page1", "page2", "page3", "page4". We can create a categorical Int value Map, such as: |
| |
| ```scala |
| Map( |
| "page1" -> 0, |
| "page2" -> 1, |
| "page3" -> 2, |
| "page4" -> 3 |
| ) |
| ``` |
| |
| Instead of manually create such Map, a helper method `createCategoricalIntMap()` is defined in **Prepraator.scala** for this purpose. |
| |
| Each `labeledPoint` is a label and a feature vector. The element index of the vector for the corresponding feature is defined by `featureIndex` Map. By default, it's defined as |
| |
| ```scala |
| val featureIndex = Map( |
| "landingPage" -> 0, |
| "referrer" -> 1, |
| "browser" -> 2 |
| ) |
| ``` |
| |
| which means that index 0 of the feature vector is the "landingPage" feature, index 1 is "referrer" feature, and so on. |
| |
| The `prepare()` of the `Preparator` class first finds out all possible categorical values for the features and create a categorical Int map. Then it converts to the `Session` object to the `LabeledPoint` by creating the feature vector and the label. In this case, the label is 1 if there is any conversion and 0 if there is no conversion: |
| |
| ```scala |
| class Preparator extends PPreparator[TrainingData, PreparedData] { |
| |
| ... |
| |
| def prepare(sc: SparkContext, td: TrainingData): PreparedData = { |
| |
| // find out all values of the each feature |
| val landingValues = td.session.map(_.landingPageId).distinct.collect |
| val referrerValues = td.session.map(_.referrerId).distinct.collect |
| val browserValues = td.session.map(_.browser).distinct.collect |
| |
| // map feature value to integer for each categorical feature |
| val featureCategoricalIntMap = Map( |
| "landingPage" -> createCategoricalIntMap(landingValues, ""), |
| "referrer" -> createCategoricalIntMap(referrerValues, ""), |
| "browser" -> createCategoricalIntMap(browserValues, "") |
| ) |
| // index position of each feature in the vector |
| val featureIndex = Map( |
| "landingPage" -> 0, |
| "referrer" -> 1, |
| "browser" -> 2 |
| ) |
| |
| // inject some default to cover default cases |
| val defaults = Seq( |
| new Session( |
| landingPageId = "", |
| referrerId = "", |
| browser = "", |
| buy = false |
| ), |
| new Session( |
| landingPageId = "", |
| referrerId = "", |
| browser = "", |
| buy = true |
| )) |
| |
| val defaultRDD = sc.parallelize(defaults) |
| val sessionRDD = td.session.union(defaultRDD) |
| |
| val labeledPoints: RDD[LabeledPoint] = sessionRDD.map { session => |
| logger.debug(s"${session}") |
| val label = if (session.buy) 1.0 else 0.0 |
| |
| val feature = new Array[Double](featureIndex.size) |
| feature(featureIndex("landingPage")) = |
| featureCategoricalIntMap("landingPage")(session.landingPageId).toDouble |
| feature(featureIndex("referrer")) = |
| featureCategoricalIntMap("referrer")(session.referrerId).toDouble |
| feature(featureIndex("browser")) = |
| featureCategoricalIntMap("browser")(session.browser).toDouble |
| |
| LabeledPoint(label, Vectors.dense(feature)) |
| }.cache() |
| |
| logger.debug(s"labelelPoints count: ${labeledPoints.count()}") |
| new PreparedData( |
| labeledPoints = labeledPoints, |
| featureIndex = featureIndex, |
| featureCategoricalIntMap = featureCategoricalIntMap) |
| } |
| } |
| ``` |
| |
| PredictionIO passes the returned `PreparedData` object to Algorithm's `train` function. |
| |
| ## Algorithm |
| |
| In MyLeadScoring/src/main/scala/***ALSAlgorithm.scala***, the two methods of |
| the algorithm class are `train` and `predict`. `train` is responsible for |
| training the predictive model; `predict` is |
| responsible for using this model to make prediction. |
| |
| The default algorithm is Spark's MLlib [RandomForest algorithm](https://spark.apache.org/docs/latest/mllib-ensembles.html#random-forests). |
| |
| ### Algorithm parameters |
| |
| The Algorithm takes the following parameters, as defined by the `AlgorithmParams` case class: |
| |
| ```scala |
| case class RFAlgorithmParams( |
| numTrees: Int, |
| featureSubsetStrategy: String, |
| impurity: String, |
| maxDepth: Int, |
| maxBins: Int, |
| seed: Option[Int] |
| ) extends Params |
| ``` |
| |
| You can find more description of the parameters in MLlib's [RandomForest documentation](https://spark.apache.org/docs/latest/mllib-ensembles.html#random-forests) and [Decision Tree documentation](https://spark.apache.org/docs/latest/mllib-decision-tree.html). |
| |
| The values of these parameters can be specified in *algorithms* of |
| MyLeadScoring/***engine.json***: |
| |
| ``` |
| { |
| ... |
| "algorithms": [ |
| { |
| "name": "randomforest", |
| "params": { |
| "numClasses": 3, |
| "numTrees": 5, |
| "featureSubsetStrategy": "auto", |
| "impurity": "variance", |
| "maxDepth": 4, |
| "maxBins": 100, |
| "seed" : 12345 |
| } |
| } |
| ] |
| ... |
| } |
| ``` |
| |
| PredictionIO will automatically loads these values into the constructor of the `RFAlgorithm` class. |
| |
| ```scala |
| class RFAlgorithm(val ap: RFAlgorithmParams) |
| extends P2LAlgorithm[PreparedData, RFModel, Query, PredictedResult] { |
| ... |
| } |
| ``` |
| |
| ### train(...) |
| |
| `train` is called when you run **pio train** to train a predictive model. |
| |
| The algorithm first generates the `categoricalFeaturesInfo` which is required by the MLlib. This indicates how many categorical values for each categorical features. Then it calls `RandomForest.trainRegressor()` to train a `RandomForestModel` to predict the probability that the user may convert. |
| |
| ```scala |
| |
| def train(sc: SparkContext, pd: PreparedData): RFModel = { |
| |
| val categoricalFeaturesInfo = pd.featureCategoricalIntMap |
| .map { case (f, m) => |
| (pd.featureIndex(f), m.size) |
| } |
| |
| logger.info(s"categoricalFeaturesInfo: ${categoricalFeaturesInfo}") |
| |
| // use random seed if seed is not specified |
| val seed = ap.seed.getOrElse(scala.util.Random.nextInt()) |
| |
| val forestModel: RandomForestModel = RandomForest.trainRegressor( |
| input = pd.labeledPoints, |
| categoricalFeaturesInfo = categoricalFeaturesInfo, |
| numTrees = ap.numTrees, |
| featureSubsetStrategy = ap.featureSubsetStrategy, |
| impurity = ap.impurity, |
| maxDepth = ap.maxDepth, |
| maxBins = ap.maxBins, |
| seed = seed) |
| |
| new RFModel( |
| forest = forestModel, |
| featureIndex = pd.featureIndex, |
| featureCategoricalIntMap = pd.featureCategoricalIntMap |
| ) |
| } |
| |
| ``` |
| |
| PredictionIO will automatically store the returned model after training. |
| |
| The `RFModel` stores the `RandomForestModel`, and the `featureIndex` and `featureCategoricalIntMap`: |
| |
| ```scala |
| class RFModel( |
| val forest: RandomForestModel, |
| val featureIndex: Map[String, Int], |
| val featureCategoricalIntMap: Map[String, Map[String, Int]] |
| ) extends Serializable { |
| ... |
| } |
| ``` |
| |
| ### predict(...) |
| |
| `predict` is called when you send a JSON query to |
| http://localhost:8000/queries.json. PredictionIO converts the query, such as '{ "landingPageId" : "example.com/page9", "referrerId" : "referrer10.com", "browser": "Firefox" }' to the `Query` class you defined previously in `Engine.scala`. |
| |
| The `predict()` function does the following: |
| |
| 1. convert the Query to the required feature vector input |
| 2. use the `RandomForestModel` to predict the probability of conversion given this feature. |
| |
| ```scala |
| |
| ... |
| |
| def predict(model: RFModel, query: Query): PredictedResult = { |
| |
| val featureIndex = model.featureIndex |
| val featureCategoricalIntMap = model.featureCategoricalIntMap |
| |
| val landingPageId = query.landingPageId |
| val referrerId = query.referrerId |
| val browser = query.browser |
| |
| // look up categorical feature Int for landingPageId |
| val landingFeature = lookupCategoricalInt( |
| featureCategoricalIntMap = featureCategoricalIntMap, |
| feature = "landingPage", |
| value = landingPageId, |
| default = "" |
| ).toDouble |
| |
| |
| // look up categorical feature Int for referrerId |
| val referrerFeature = lookupCategoricalInt( |
| featureCategoricalIntMap = featureCategoricalIntMap, |
| feature = "referrer", |
| value = referrerId, |
| default = "" |
| ).toDouble |
| |
| // look up categorical feature Int for brwoser |
| val browserFeature = lookupCategoricalInt( |
| featureCategoricalIntMap = featureCategoricalIntMap, |
| feature = "browser", |
| value = browser, |
| default = "" |
| ).toDouble |
| |
| // create feature Array |
| val feature = new Array[Double](model.featureIndex.size) |
| feature(featureIndex("landingPage")) = landingFeature |
| feature(featureIndex("referrer")) = referrerFeature |
| feature(featureIndex("browser")) = browserFeature |
| |
| val score = model.forest.predict(Vectors.dense(feature)) |
| new PredictedResult(score) |
| } |
| |
| ... |
| |
| ``` |
| |
| PredictionIO passes the returned `PredictedResult` object to *Serving*. |
| |
| ## Serving |
| |
| The `serve` method of class `Serving` processes predicted result. It is also |
| responsible for combining multiple predicted results into one if you have more |
| than one predictive model. *Serving* then returns the final predicted result. |
| PredictionIO will convert it to a JSON response automatically. |
| |
| In MyLeadScoring/src/main/scala/***Serving.scala***, |
| |
| ```scala |
| class Serving extends LServing[Query, PredictedResult] { |
| |
| @transient lazy val logger = Logger[this.type] |
| |
| override |
| def serve(query: Query, |
| predictedResults: Seq[PredictedResult]): PredictedResult = { |
| predictedResults.head |
| } |
| } |
| ``` |
| |
| When you send a JSON query to http://localhost:8000/queries.json, |
| `PredictedResult` from all models will be passed to `serve` as a sequence, i.e. |
| `Seq[PredictedResult]`. |
| |
| NOTE: An engine can train multiple models if you specify more than one Algorithm |
| component in `object LeadScoringEngine` inside ***Engine.scala*** and the corresponding parameters in ***engine.json***. Since only one algorithm is implemented by default, this `Seq` contains one element. |