blob: e754f291106fd801c63ae68c9194011058eeac91 [file] [log] [blame] [view]
---
title: Batch Persistable Evaluator (Recommendation)
---
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
This how-to tutorial would explain how you can also use `$pio eval` to persist predicted result for a batch of queries. Please read the [Evaluation](/templates/recommendation/evaluation/) to understand the usage of DataSource's `readEval()` and the Evaluation component first.
WARNING: This tutorial is based on some experimental and developer features, which may be changed in future release.
NOTE: This tutorial is based on Recommendation template version v0.3.2
## 1. Modify DataSource
Modify DataSource's `readEval()` to generate the batch Queries which you want to run batch predict.
```scala
override
def readEval(sc: SparkContext)
: Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = {
// This function only return one evaluation data set
// Create your own queries here. Below are provided as examples.
// for example, you may get all distinct user id from the trainingData to create the Query
val batchQueries: RDD[Query] = sc.parallelize(
Seq(
Query(user = "1", num = 10),
Query(user = "3", num = 15),
Query(user = "5", num = 20)
)
)
val queryAndActual: RDD[(Query, ActualResult)] = batchQueries.map (q =>
// the ActualResult contain dummy empty rating array
// because we not interested in Actual result for batch predict purpose.
(q, ActualResult(Array()))
)
val evalDataSet = (
readTraining(sc),
new EmptyEvaluationInfo(),
queryAndActual
)
Seq(evalDataSet)
}
```
NOTE: Alternatively, you can create a new DataSource extending original DataSource. Then you can add the new one in Engine.scala and specify which one to use in engine.json.
<!-- TODO add more details -->
## 2. Add a new Evaluator
Create a new file `BatchPersistableEvaluator.scala`. Unlike the `MetricEvaluator`, this Evaluator simply writes the Query and corresponding PredictedResult to the output directory without performing any metrics calculation.
Note that output directory is specified by the variable `outputDir`.
```scala
package org.template.recommendation
import org.apache.predictionio.controller.EmptyEvaluationInfo
import org.apache.predictionio.controller.Engine
import org.apache.predictionio.controller.EngineParams
import org.apache.predictionio.controller.EngineParamsGenerator
import org.apache.predictionio.controller.Evaluation
import org.apache.predictionio.controller.Params
import org.apache.predictionio.core.BaseEvaluator
import org.apache.predictionio.core.BaseEvaluatorResult
import org.apache.predictionio.workflow.WorkflowParams
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.json4s.DefaultFormats
import org.json4s.Formats
import org.json4s.native.Serialization
import grizzled.slf4j.Logger
class BatchPersistableEvaluatorResult extends BaseEvaluatorResult {}
class BatchPersistableEvaluator extends BaseEvaluator[
EmptyEvaluationInfo,
Query,
PredictedResult,
ActualResult,
BatchPersistableEvaluatorResult] {
@transient lazy val logger = Logger[this.type]
// A helper object for the json4s serialization
case class Row(query: Query, predictedResult: PredictedResult)
extends Serializable
def evaluateBase(
sc: SparkContext,
evaluation: Evaluation,
engineEvalDataSet: Seq[(
EngineParams,
Seq[(EmptyEvaluationInfo, RDD[(Query, PredictedResult, ActualResult)])])],
params: WorkflowParams): BatchPersistableEvaluatorResult = {
/** Extract the first data, as we are only interested in the first
* evaluation. It is possible to relax this restriction, and have the
* output logic below to write to different directory for different engine
* params.
*/
require(
engineEvalDataSet.size == 1, "There should be only one engine params")
val evalDataSet = engineEvalDataSet.head._2
require(evalDataSet.size == 1, "There should be only one RDD[(Q, P, A)]")
val qpaRDD = evalDataSet.head._2
// qpaRDD contains 3 queries we specified in readEval, the corresponding
// predictedResults, and the dummy actual result.
/** The output directory. Better to use absolute path if you run on cluster.
* If your database has a Hadoop interface, you can also convert the
* following to write to your database in parallel as well.
*/
val outputDir = "batch_result"
logger.info("Writing result to disk")
qpaRDD
.map { case (q, p, a) => Row(q, p) }
.map { row =>
// Convert into a json
implicit val formats: Formats = DefaultFormats
Serialization.write(row)
}
.saveAsTextFile(outputDir)
logger.info(s"Result can be found in $outputDir")
new BatchPersistableEvaluatorResult()
}
}
```
## 3. Define Evaluation and EngineParamsGenerator object
Create a new file `BatchEvaluation.scala`. Note that the new `BatchPersistableEvaluator` is used. The `BatchEngineParamsList` specifies the parameters of the engine.
Modify the appName parameter in `DataSourceParams` to match your app name.
```scala
package org.template.recommendation
import org.apache.predictionio.controller.EngineParamsGenerator
import org.apache.predictionio.controller.EngineParams
import org.apache.predictionio.controller.Evaluation
object BatchEvaluation extends Evaluation {
// Define Engine and Evaluator used in Evaluation
/**
* Specify the new BatchPersistableEvaluator.
*/
engineEvaluator =
(RecommendationEngine(), new BatchPersistableEvaluator())
}
object BatchEngineParamsList extends EngineParamsGenerator {
// We only interest in a single engine params.
engineParamsList = Seq(
EngineParams(
dataSourceParams =
DataSourceParams(appName = "INVALID_APP_NAME", evalParams = None),
algorithmParamsList = Seq(("als", ALSAlgorithmParams(
rank = 10,
numIterations = 20,
lambda = 0.01,
seed = Some(3L))))))
}
```
## 4. build and run
Run the following command to build
```
$ pio build
```
After the build is successful, you should see the following outputs:
```
[INFO] [Console$] Your engine is ready for training.
```
To run the `BatchEvaluation` with `BatchEngineParamsList`, run the following command:
```
$ pio eval org.template.recommendation.BatchEvaluation org.template.recommendation.BatchEngineParamsList
```
You should see the following outputs:
```
[INFO] [BatchPersistableEvaluator] Writing result to disk
[INFO] [BatchPersistableEvaluator] Result can be found in batch_result
[INFO] [CoreWorkflow$] Updating evaluation instance with result: org.template.recommendation.BatchPersistableEvaluatorResult@2f886889
[INFO] [CoreWorkflow$] runEvaluation completed
```
You should find the batch queries and the predicted results in the output directory `batch_result/`.