| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| |
| package org.apache.predictionio.tools.export |
| |
| import org.apache.predictionio.controller.Utils |
| import org.apache.predictionio.data.storage.EventJson4sSupport |
| import org.apache.predictionio.data.storage.Storage |
| import org.apache.predictionio.tools.Runner |
| import org.apache.predictionio.workflow.WorkflowContext |
| import org.apache.predictionio.workflow.WorkflowUtils |
| import org.apache.predictionio.workflow.CleanupFunctions |
| import grizzled.slf4j.Logging |
| import org.apache.spark.sql.{SaveMode, SparkSession} |
| import org.json4s.native.Serialization._ |
| |
| case class EventsToFileArgs( |
| env: String = "", |
| logFile: String = "", |
| appId: Int = 0, |
| channel: Option[String] = None, |
| outputPath: String = "", |
| format: String = "parquet", |
| verbose: Boolean = false, |
| debug: Boolean = false) |
| |
| object EventsToFile extends Logging { |
| def main(args: Array[String]): Unit = { |
| val parser = new scopt.OptionParser[EventsToFileArgs]("EventsToFile") { |
| opt[String]("env") action { (x, c) => |
| c.copy(env = x) |
| } |
| opt[String]("log-file") action { (x, c) => |
| c.copy(logFile = x) |
| } |
| opt[Int]("appid") action { (x, c) => |
| c.copy(appId = x) |
| } |
| opt[String]("channel") action { (x, c) => |
| c.copy(channel = Some(x)) |
| } |
| opt[String]("format") action { (x, c) => |
| c.copy(format = x) |
| } |
| opt[String]("output") action { (x, c) => |
| c.copy(outputPath = x) |
| } |
| opt[Unit]("verbose") action { (x, c) => |
| c.copy(verbose = true) |
| } |
| opt[Unit]("debug") action { (x, c) => |
| c.copy(debug = true) |
| } |
| } |
| parser.parse(args, EventsToFileArgs()) map { args => |
| try { |
| // get channelId |
| val channels = Storage.getMetaDataChannels |
| val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap |
| |
| val channelId: Option[Int] = args.channel.map { ch => |
| if (!channelMap.contains(ch)) { |
| error(s"Channel ${ch} doesn't exist in this app.") |
| sys.exit(1) |
| } |
| |
| channelMap(ch) |
| } |
| |
| val channelStr = args.channel.map(n => " Channel " + n).getOrElse("") |
| |
| WorkflowUtils.modifyLogging(verbose = args.verbose) |
| @transient lazy implicit val formats = Utils.json4sDefaultFormats + |
| new EventJson4sSupport.APISerializer |
| val sc = WorkflowContext( |
| mode = "Export", |
| batch = "App ID " + args.appId + channelStr, |
| executorEnv = Runner.envStringToMap(args.env)) |
| val sqlSession = SparkSession.builder().getOrCreate() |
| val events = Storage.getPEvents() |
| val eventsRdd = events.find(appId = args.appId, channelId = channelId)(sc) |
| val jsonStringRdd = eventsRdd.map(write(_)) |
| if (args.format == "json") { |
| jsonStringRdd.saveAsTextFile(args.outputPath) |
| } else { |
| val jsonDf = sqlSession.read.json(jsonStringRdd) |
| jsonDf.write.mode(SaveMode.ErrorIfExists).parquet(args.outputPath) |
| } |
| info(s"Events are exported to ${args.outputPath}/.") |
| info("Done.") |
| |
| } finally { |
| CleanupFunctions.run() |
| } |
| } |
| } |
| } |