| --- |
| title: DASE Components Explained (Recommendation) |
| --- |
| |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| |
| <%= partial 'shared/dase/dase', |
| locals: { |
| template_name: 'Recommendation Engine Template', |
| evaluation_link: '/templates/recommendation/evaluation/' |
| } %> |
| |
| ## The Engine Design |
| |
| As you can see from the Quick Start, *MyRecommendation* takes a JSON prediction |
| query, e.g. `{ "user": "1", "num": 4 }`, and return a JSON predicted result. |
| In MyRecommendation/src/main/scala/***Engine.scala***, the `Query` case class |
| defines the format of such **query**: |
| |
| ```scala |
| case class Query( |
| user: String, |
| num: Int |
| ) |
| ``` |
| |
| The `PredictedResult` case class defines the format of **predicted result**, |
| such as |
| |
| ```json |
| {"itemScores":[ |
| {"item":22,"score":4.07}, |
| {"item":62,"score":4.05}, |
| {"item":75,"score":4.04}, |
| {"item":68,"score":3.81} |
| ]} |
| ``` |
| |
| with: |
| |
| ```scala |
| case class PredictedResult( |
| itemScores: Array[ItemScore] |
| ) |
| |
| case class ItemScore( |
| item: String, |
| score: Double |
| ) |
| ``` |
| |
| Finally, `RecommendationEngine` is the *Engine Factory* that defines the |
| components this engine will use: Data Source, Data Preparator, Algorithm(s) and |
| Serving components. |
| |
| ```scala |
| object RecommendationEngine extends IEngineFactory { |
| def apply() = { |
| new Engine( |
| classOf[DataSource], |
| classOf[Preparator], |
| Map("als" -> classOf[ALSAlgorithm]), |
| classOf[Serving]) |
| } |
| ... |
| } |
| ``` |
| |
| ### Spark MLlib |
| |
| Spark's MLlib ALS algorithm takes training data of RDD type, i.e. `RDD[Rating]` |
| and train a model, which is a `MatrixFactorizationModel` object. |
| |
| PredictionIO Recommendation Engine Template, which |
| *MyRecommendation* bases on, integrates this algorithm under the DASE |
| architecture. We will take a closer look at the DASE code below. |
| |
| INFO: [Check this |
| out](https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html) to |
| learn more about MLlib's ALS collaborative filtering algorithm. |
| |
| |
| ## Data |
| |
| In the DASE architecture, data is prepared by 2 components sequentially: *Data |
| Source* and *Data Preparator*. *Data Source* and *Data Preparator* takes data |
| from the data store and prepares `RDD[Rating]` for the ALS algorithm. |
| |
| ### Data Source |
| |
| In MyRecommendation/src/main/scala/***DataSource.scala***, the `readTraining` |
| method of class `DataSource` reads, and selects, data from the *Event Store* |
| (data store of the *Event Server*) and returns `TrainingData`. |
| |
| ```scala |
| case class DataSourceParams(appName: String) extends Params |
| |
| class DataSource(val dsp: DataSourceParams) |
| extends PDataSource[TrainingData, |
| EmptyEvaluationInfo, Query, EmptyActualResult] { |
| |
| @transient lazy val logger = Logger[this.type] |
| |
| def getRatings(sc: SparkContext): RDD[Rating] = { |
| |
| val eventsRDD: RDD[Event] = PEventStore.find( |
| appName = dsp.appName, |
| entityType = Some("user"), |
| eventNames = Some(List("rate", "buy")), // read "rate" and "buy" event |
| // targetEntityType is optional field of an event. |
| targetEntityType = Some(Some("item")))(sc) |
| |
| val ratingsRDD: RDD[Rating] = eventsRDD.map { event => |
| val rating = try { |
| val ratingValue: Double = event.event match { |
| case "rate" => event.properties.get[Double]("rating") |
| case "buy" => 4.0 // map buy event to rating value of 4 |
| case _ => throw new Exception(s"Unexpected event ${event} is read.") |
| } |
| // entityId and targetEntityId is String |
| Rating(event.entityId, |
| event.targetEntityId.get, |
| ratingValue) |
| } catch { |
| case e: Exception => { |
| logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.") |
| throw e |
| } |
| } |
| rating |
| }.cache() |
| |
| ratingsRDD |
| } |
| |
| override |
| def readTraining(sc: SparkContext): TrainingData = { |
| new TrainingData(getRatings(sc)) |
| } |
| |
| } |
| ``` |
| |
| `PEventStore` is an object which provides function to access data that is collected by PredictionIO *Event Server*. |
| `PEventStore.find(...)` specifies the events that you want to read. PredictionIO |
| automatically loads the parameters of *datasource* specified in |
| MyRecommendation/***engine.json***, including *appName*, to `dsp`. |
| |
| In ***engine.json***: |
| |
| ``` |
| { |
| ... |
| "datasource": { |
| "params" : { |
| "appName": "MyApp1" |
| } |
| }, |
| ... |
| } |
| ``` |
| |
| Each *rate* and *buy* user event data is read as `Rating`. |
| For flexibility, this Recommendation engine template is designed to support user ID and item ID in `String`. |
| Since Spark MLlib's `Rating` class assumes `Int`-only user ID and item ID, you have to define a new `Rating` class: |
| |
| ```scala |
| case class Rating( |
| user: String, |
| item: String, |
| rating: Double |
| ) |
| ``` |
| |
| `TrainingData` contains an RDD of all these `Rating` events. The class definition of `TrainingData` is: |
| |
| ```scala |
| class TrainingData( |
| val ratings: RDD[Rating] |
| ) extends Serializable {...} |
| ``` |
| and PredictionIO passes the returned `TrainingData` object to *Data Preparator*. |
| |
| <!-- TODO |
| > HOW-TO: |
| > |
| > You may modify readTraining function to read from other datastores, such as MongoDB - [link] |
| --> |
| |
| INFO: You could [modify the DataSource to read custom events](reading-custom-events.html) other than the default **rate** and **buy**. |
| |
| ### Data Preparator |
| |
| In MyRecommendation/src/main/scala/***Preparator.scala***, the `prepare` method |
| of class `Preparator` takes `TrainingData` as its input and performs any |
| necessary feature selection and data processing tasks. At the end, it returns |
| `PreparedData` which should contain the data *Algorithm* needs. For MLlib ALS, |
| it is `RDD[Rating]`. |
| |
| By default, `prepare` simply copies the unprocessed `TrainingData` data to `PreparedData`: |
| |
| ```scala |
| class Preparator |
| extends PPreparator[TrainingData, PreparedData] { |
| |
| def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { |
| new PreparedData(ratings = trainingData.ratings) |
| } |
| } |
| |
| class PreparedData( |
| val ratings: RDD[Rating] |
| ) extends Serializable |
| ``` |
| |
| PredictionIO passes the returned `PreparedData` object to Algorithm's `train` function. |
| |
| <!-- TODO |
| > HOW-TO: |
| > |
| > MLlib ALS limitation: user id, item id must be integer - convert [link] |
| --> |
| |
| ## Algorithm |
| |
| In MyRecommendation/src/main/scala/***ALSAlgorithm.scala***, the two methods of |
| the algorithm class are `train` and `predict`. `train` is responsible for |
| training a predictive model. PredictionIO will store this model and `predict` is |
| responsible for using this model to make prediction. |
| |
| ### train(...) |
| |
| `train` is called when you run **pio train**. This is where MLlib ALS algorithm, |
| i.e. `ALS.train`, is used to train a predictive model. |
| |
| |
| ```scala |
| def train(sc: SparkContext, data: PreparedData): ALSModel = { |
| ... |
| // Convert user and item String IDs to Int index for MLlib |
| val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user)) |
| val itemStringIntMap = BiMap.stringInt(data.ratings.map(_.item)) |
| val mllibRatings = data.ratings.map( r => |
| // MLlibRating requires integer index for user and item |
| MLlibRating(userStringIntMap(r.user), itemStringIntMap(r.item), r.rating) |
| ) |
| |
| // seed for MLlib ALS |
| val seed = ap.seed.getOrElse(System.nanoTime) |
| |
| // If you only have one type of implicit event (Eg. "view" event only), |
| // replace ALS.train(...) with |
| //val m = ALS.trainImplicit( |
| //ratings = mllibRatings, |
| //rank = ap.rank, |
| //iterations = ap.numIterations, |
| //lambda = ap.lambda, |
| //blocks = -1, |
| //alpha = 1.0, |
| //seed = seed) |
| |
| val m = ALS.train( |
| ratings = mllibRatings, |
| rank = ap.rank, |
| iterations = ap.numIterations, |
| lambda = ap.lambda, |
| blocks = -1, |
| seed = seed) |
| |
| new ALSModel( |
| rank = m.rank, |
| userFeatures = m.userFeatures, |
| productFeatures = m.productFeatures, |
| userStringIntMap = userStringIntMap, |
| itemStringIntMap = itemStringIntMap) |
| } |
| ``` |
| |
| #### Working with Spark MLlib's ALS.train(....) |
| |
| As mentioned above, MLlib's `Rating` does not support `String` user ID and item ID. |
| Its `ALS.train` thus also assumes `Int`-only `Rating`. |
| |
| Here you need to map your String-supported `Rating` to MLlib's Integer-only `Rating`. |
| First, you can rename MLlib's Integer-only `Rating` to `MLlibRating` for clarity: |
| |
| ``` |
| import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} |
| ``` |
| |
| You then create a bi-directional map with `BiMap.stringInt` which maps each String record to an Integer index. |
| |
| ``` |
| val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user)) |
| val itemStringIntMap = BiMap.stringInt(data.ratings.map(_.item)) |
| ``` |
| Finally, you re-create each `Rating` event as `MLlibRating`: |
| |
| ``` |
| MLlibRating(userStringIntMap(r.user), itemStringIntMap(r.item), r.rating) |
| ``` |
| |
| |
| In addition to `RDD[MLlibRating]`, `ALS.train` takes the following parameters: *rank*, *iterations*, *lambda* and *seed*. |
| |
| The values of these parameters are specified in *algorithms* of |
| MyRecommendation/***engine.json***: |
| |
| ``` |
| { |
| ... |
| "algorithms": [ |
| { |
| "name": "als", |
| "params": { |
| "rank": 10, |
| "numIterations": 20, |
| "lambda": 0.01, |
| "seed": 3 |
| } |
| } |
| ] |
| ... |
| } |
| ``` |
| |
| PredictionIO will automatically loads these values into the constructor `ap`, |
| which has a corresponding case class `ALSAlgorithmParams`: |
| |
| ```scala |
| case class ALSAlgorithmParams( |
| rank: Int, |
| numIterations: Int, |
| lambda: Double, |
| seed: Option[Long]) extends Params |
| ``` |
| |
| The `seed` parameter is an optional parameter, which is used by MLlib ALS algorithm internally to generate random values. If the `seed` is not specified, current system time would be used and hence each train may produce different reuslts. Specify a fixed value for the `seed` if you want to have deterministic result (For example, when you are testing). |
| |
| `ALS.train` then returns a `MatrixFactorizationModel` model which contains RDD |
| data. RDD is a distributed collection of items which *does not* persist. To |
| store the model, you convert the model to `ALSModel` class at the end. |
| `ALSModel` is a persistable class that extends `MatrixFactorizationModel`. |
| |
| > The detailed implementation can be found at |
| MyRecommendation/src/main/scala/***ALSModel.scala*** |
| |
| |
| PredictionIO will automatically store the returned model, i.e. `ALSModel` in this case. |
| |
| |
| ### predict(...) |
| |
| `predict` is called when you send a JSON query to |
| http://localhost:8000/queries.json. PredictionIO converts the query, such as `{ |
| "user": "1", "num": 4 }` to the `Query` class you defined previously. |
| |
| The predictive model `MatrixFactorizationModel` of MLlib ALS, which is now |
| extended as `ALSModel`, offers a method called |
| `recommendProducts`. `recommendProducts` takes two parameters: user id (i.e. |
| the `Int` index of `query.user`) and the number of items to be returned (i.e. `query.num`). It |
| predicts the top *num* of items a user will like. |
| |
| ```scala |
| def predict(model: ALSModel, query: Query): PredictedResult = { |
| // Convert String ID to Int index for Mllib |
| model.userStringIntMap.get(query.user).map { userInt => |
| // create inverse view of itemStringIntMap |
| val itemIntStringMap = model.itemStringIntMap.inverse |
| // recommendProducts() returns Array[MLlibRating], which uses item Int |
| // index. Convert it to String ID for returning PredictedResult |
| val itemScores = model.recommendProducts(userInt, query.num) |
| .map (r => ItemScore(itemIntStringMap(r.product), r.rating)) |
| PredictedResult(itemScores) |
| }.getOrElse{ |
| logger.info(s"No prediction for unknown user ${query.user}.") |
| PredictedResult(Array.empty) |
| } |
| } |
| ``` |
| |
| Note that `recommendProducts` returns the `Int` indices of items. You map them back to `String` with `itemIntStringMap` before they are returned. |
| |
| > You have defined the class `PredictedResult` earlier. |
| |
| PredictionIO passes the returned `PredictedResult` object to *Serving*. |
| |
| ## Serving |
| |
| The `serve` method of class `Serving` processes predicted result. It is also |
| responsible for combining multiple predicted results into one if you have more |
| than one predictive model. *Serving* then returns the final predicted result. |
| PredictionIO will convert it to a JSON response automatically. |
| |
| In MyRecommendation/src/main/scala/***Serving.scala***, |
| |
| ```scala |
| class Serving |
| extends LServing[Query, PredictedResult] { |
| |
| override |
| def serve(query: Query, |
| predictedResults: Seq[PredictedResult]): PredictedResult = { |
| predictedResults.head |
| } |
| } |
| ``` |
| |
| When you send a JSON query to http://localhost:8000/queries.json, |
| `PredictedResult` from all models will be passed to `serve` as a sequence, i.e. |
| `Seq[PredictedResult]`. |
| |
| > An engine can train multiple models if you specify more than one Algorithm |
| component in `object RecommendationEngine` inside ***Engine.scala***. Since only |
| one `ALSAlgorithm` is implemented by default, this `Seq` contains one element. |
| |
| |
| Now you should have a good understanding of the DASE model. We will show you an |
| example of customizing the Data Preparator to exclude certain items from your |
| training set. |
| |
| #### [Next: Reading Custom Events](reading-custom-events.html) |