| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.predictionio.examples.recommendation |
| |
| import org.apache.predictionio.controller.PPreparator |
| |
| import org.apache.spark.SparkContext |
| import org.apache.spark.SparkContext._ |
| import org.apache.spark.rdd.RDD |
| |
| import scala.io.Source // ADDED |
| import org.apache.predictionio.controller.Params // ADDED |
| |
| // ADDED CustomPreparatorParams case class |
| case class CustomPreparatorParams( |
| filepath: String |
| ) extends Params |
| |
| class Preparator(pp: CustomPreparatorParams) // ADDED CustomPreparatorParams |
| extends PPreparator[TrainingData, PreparedData] { |
| |
| def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { |
| val noTrainItems = Source.fromFile(pp.filepath).getLines.toSet // CHANGED |
| val ratings = trainingData.ratings.filter( r => |
| !noTrainItems.contains(r.item) |
| ) |
| new PreparedData(ratings) |
| } |
| } |
| |
| class PreparedData( |
| val ratings: RDD[Rating] |
| ) extends Serializable |