| --- |
| title: Implementing DASE |
| --- |
| |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| |
| This section gives you an overview of DASE components and how to implement them. You will find links to some engine templates for more concrete examples. |
| |
| # DataSource |
| |
| DataSource reads and selects useful data from the Event Store (data store of the Event Server) and returns TrainingData. |
| |
| ## readTraining() |
| |
| You need to implement readTraining() of [PDataSource](https://predictionio.apache.org/api/current/#org.apache.predictionio.controller.PDataSource), where you can use the [PEventStore Engine API](https://predictionio.apache.org/api/current/#org.apache.predictionio.data.store.PEventStore$) to read the events and create the TrainingData based on the events. |
| |
| The following code example reads user "view" and "buy" item events, filters specific type of events for future processing and returns TrainingData accordingly. |
| |
| ```scala |
| class DataSource(val dsp: DataSourceParams) |
| extends PDataSource[TrainingData, |
| EmptyEvaluationInfo, Query, EmptyActualResult] { |
| |
| @transient lazy val logger = Logger[this.type] |
| |
| override |
| def readTraining(sc: SparkContext): TrainingData = { |
| |
| val eventsRDD: RDD[Event] = PEventStore.find( |
| appName = dsp.appName, |
| entityType = Some("user"), |
| eventNames = Some(List("view", "buy")), |
| // targetEntityType is optional field of an event. |
| targetEntityType = Some(Some("item")))(sc) |
| .cache() |
| |
| val viewEventsRDD: RDD[ViewEvent] = eventsRDD |
| .filter { event => event.event == "view" } |
| .map { ... } |
| |
| ... |
| |
| new TrainingData(...) |
| } |
| |
| } |
| ``` |
| |
| |
| ## Using PEventStore Engine API |
| |
| Please see [Event Server Overview](https://predictionio.apache.org/datacollection/) to understand [EventAPI](https://predictionio.apache.org/datacollection/eventapi/) and [event modeling](https://predictionio.apache.org/datacollection/eventmodel/). |
| |
| With [PEventStore Engine API](https://predictionio.apache.org/api/current/#org.apache.predictionio.data.store.PEventStore$), you can easily read different events in DataSource and get the information you need. |
| |
| For example, let's say you have events like the following: |
| |
| ```json |
| { |
| "event": "myEvent", |
| "entityType": "user", |
| "entityId": "u0", |
| "targetEntityType": "item", |
| "targetEntityId": "i0", |
| "properties" : { |
| "a" : 3, |
| "b" : "some_string", |
| "c" : ["a", "b", "c"], |
| "d" : [1.2, 3.4, 5.6], |
| "e" : 6 |
| } |
| } |
| ``` |
| |
| Then following code could read these events and extract the properties field of the event and convert it to a `MyEvent` object. |
| |
| ```scala |
| val myEvents: RDD[MyEvent] = PEventStore.find( |
| appName = dsp.appName, |
| entityType = Some("user"), |
| eventNames = Some(List("myEvent")), |
| // targetEntityType is optional field of an event. |
| targetEntityType = Some(Some("item")))(sc) |
| .map { event => |
| try { |
| MyEvent( |
| entityId = event.entityId, |
| targetEntityId = event.targetEntityId.get, |
| a = event.properties.get[Int]("a"), |
| b = event.properties.get[String]("b"), |
| c = event.properties.get[List[String]]("c"), |
| d = event.properties.get[List[Double]]("d"), |
| e = event.properties.getOpt[Int]("e") // use getOpt for optional data |
| ) |
| } catch { |
| case e: Exception => |
| logger.error(s"Cannot convert ${event}. Exception: ${e}.") |
| throw e |
| } |
| } |
| ``` |
| |
| If you have used special events `$set/$unset/$delete` setting entity's properties, you can retrieve it with `PEventStore.aggregateProperties()`. |
| |
| Please see [event modeling](https://predictionio.apache.org/datacollection/eventmodel/) to understand usage of special `$set/$unset/$delete` events. |
| |
| For example, the following code show how you could retrieve properties of the "item" entities: |
| |
| ```scala |
| // create a RDD of (entityID, Item) |
| val itemsRDD: RDD[(String, Item)] = PEventStore.aggregateProperties( |
| appName = dsp.appName, |
| entityType = "item" |
| )(sc).map { case (entityId, properties) => |
| |
| try { |
| val item = Item( |
| a = preopties.get[Int]("a"), |
| b = properties.get[String]("b"), |
| c = properties.get[List[String]]("c"), |
| d = properties.get[List[Double]]("d"), |
| e = properties.getOpt[Int]("e") // use getOpt for optional data |
| ) |
| |
| (entityId, item) |
| } catch { |
| case e: Exception => |
| logger.error(s"Failed to get properties ${properties} of ${entityId}. Exception: ${e}.") |
| throw e |
| } |
| |
| } |
| ``` |
| |
| Example: |
| |
| - [DataSource of Similar Product Template](/templates/similarproduct/dase/#data) |
| |
| # Preparator |
| |
| Preparator is responsible for pre-processing `TrainingData` for any necessary feature selection and data processing tasks and generate `PreparedData` which contains the data the Algorithm needs. |
| |
| A few example usages of Preparator: |
| |
| - Feature extraction |
| - Common pre-processing logic if you have multiple algorithms |
| - For simple cases, the Preparator may simply pass the same `TrainingData` as `PreparedData` for Algorithm. |
| |
| ## prepare() |
| |
| You need to implement the `prepare()` method of [PPrepartor](https://predictionio.apache.org/api/current/#org.apache.predictionio.controller.PPreparator) to perform such tasks. |
| |
| Example: |
| |
| - [Preparator of Leading Scoring Template](/templates/leadscoring/dase/#data): it pre-processes the TrainingData and generate the feature vectors needed for the algorithm. |
| - [Preparator of Similar Product Template](/templates/similarproduct/dase/#data): it simply passes the TrainingData as PreparedData for the algorithm. |
| |
| # Algorithm |
| |
| The two methods of the Algorithm class are train() and predict(): |
| |
| ## train() |
| |
| train() is responsible for training a predictive model. It is called when you |
| run `pio train`. Apache PredictionIO will store this model. |
| |
| ## predict() |
| |
| predict() is responsible for using this model to make prediction. It is called when you send a JSON query to the engine. Note that predict() is called in real time. |
| |
| Apache PredictionIO supports two types of algorithms: |
| |
| - **[P2LAlgorithm](https://predictionio.apache.org/api/current/#org.apache.predictionio.controller.P2LAlgorithm)**: trains a Model which does not contain RDD |
| - **[PAlgorithm](https://predictionio.apache.org/api/current/#org.apache.predictionio.controller.PAlgorithm)**: trains a Model which contains RDD |
| |
| ## P2LAlgorithm |
| |
| For `P2LAlgorithm`, the Model is automatically serialized and persisted by |
| Apache PredictionIO after training. |
| |
| Implementing `IPersistentModel` and `IPersistentModelLoader` is optional for P2LAlgorithm. |
| |
| Example: |
| |
| - [Algorithm of Similar Product Template](/templates/similarproduct/dase/#algorithm) |
| |
| ## PAlgorithm |
| |
| `PAlgorithm` should be used when your Model contains RDD. The model produced by `PAlgorithm` is not persisted by default. To persist the model, you need to do the following: |
| |
| - The Model class should extend the `IPersistentModel` trait and implement the `save()` method for saving the model. The trait `IPersistentModel` requires a type parameter which is the class type of algorithm parameter. |
| - Implement a Model factory object which extends the `IPersistentModelLoader` trait and implement the `apply()` for loading the model. The trait `IPersistentModelLoader` requires two type parameters which are the types of algorithm parameter and the model produced by the algorithm. |
| |
| Example: |
| |
| - [Algorithm of Recommendation Template](/templates/recommendation/dase/#algorithm): it implements PAlgorithm and the IPersistentModel and IPersistentModelLoader. |
| - [Algorithm of Vanilla Template](/templates/vanilla/dase): it walks through example of P2LAlgorithm and PAlgorithm. |
| |
| ## using LEventStore Engine API in predict() |
| |
| You may use [LEventStore.findByEntity()](https://predictionio.apache.org/api/current/#org.apache.predictionio.data.store.LEventStore$) to retrieve events of a specific entity. For example, retrieve recent events of the user specified in the query) and use these recent events to make prediction in real time. |
| |
| |
| For example, the following code reads the recent 10 view events of `query.user`: |
| |
| ```scala |
| val recentEvents = try { |
| LEventStore.findByEntity( |
| appName = ap.appName, |
| // entityType and entityId is specified for fast lookup |
| entityType = "user", |
| entityId = query.user, |
| eventNames = Some(List("view")), |
| targetEntityType = Some(Some("item")), |
| limit = Some(10), |
| latest = true, |
| // set time limit to avoid super long DB access |
| timeout = Duration(200, "millis") |
| ) |
| } catch { |
| case e: scala.concurrent.TimeoutException => |
| logger.error(s"Timeout when read recent events." + |
| s" Empty list is used. ${e}") |
| Iterator[Event]() |
| case e: Exception => |
| logger.error(s"Error when read recent events: ${e}") |
| throw e |
| } |
| ``` |
| |
| |
| Example: |
| |
| - [Algorithm of E-Commerce Recommendation template](/templates/ecommercerecommendation/dase#algorithm): LEventStore.findByEntity() is used to retrieve all items seen by the user and filter them from recommendation in predict(). |
| |
| |
| # Serving |
| |
| ## serve() |
| |
| You need to implement the serve() method of the class [LServing](https://predictionio.apache.org/api/current/#org.apache.predictionio.controller.LServing). The serve() method processes predicted result. It is also responsible for combining multiple predicted results into one if you have more than one predictive model. |
| |
| Example: |
| |
| - [Serving of Similar Product Template](/templates/similarproduct/dase/#serving): It simply returns the predicted result |
| - [Serving of multi-algorithm examples of Similar Product Template](/templates/similarproduct/multi-events-multi-algos/): It combines the result of multiple algorithms and return |