blob: a8fa746b950f6d6fddb50b43dff903a79a69a1b4 [file] [log] [blame]
/** Copyright 2015 TappingStone, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prediction.tools.export
import io.prediction.controller.Utils
import io.prediction.data.storage.EventJson4sSupport
import io.prediction.data.storage.Storage
import io.prediction.tools.Runner
import io.prediction.workflow.WorkflowContext
import io.prediction.workflow.WorkflowUtils
import grizzled.slf4j.Logging
import org.apache.spark.sql.SQLContext
import org.json4s.native.Serialization._
case class EventsToFileArgs(
env: String = "",
logFile: String = "",
appId: Int = 0,
outputPath: String = "",
format: String = "parquet",
verbose: Boolean = false,
debug: Boolean = false)
object EventsToFile extends Logging {
def main(args: Array[String]): Unit = {
val parser = new scopt.OptionParser[EventsToFileArgs]("EventsToFile") {
opt[String]("env") action { (x, c) =>
c.copy(env = x)
}
opt[String]("log-file") action { (x, c) =>
c.copy(logFile = x)
}
opt[Int]("appid") action { (x, c) =>
c.copy(appId = x)
}
opt[String]("format") action { (x, c) =>
c.copy(format = x)
}
opt[String]("output") action { (x, c) =>
c.copy(outputPath = x)
}
opt[Unit]("verbose") action { (x, c) =>
c.copy(verbose = true)
}
opt[Unit]("debug") action { (x, c) =>
c.copy(debug = true)
}
}
parser.parse(args, EventsToFileArgs()) map { args =>
WorkflowUtils.modifyLogging(verbose = args.verbose)
@transient lazy implicit val formats = Utils.json4sDefaultFormats +
new EventJson4sSupport.APISerializer
val sc = WorkflowContext(
mode = "Export",
batch = "App ID " + args.appId,
executorEnv = Runner.envStringToMap(args.env))
val sqlContext = new SQLContext(sc)
val events = Storage.getPEvents()
val eventsRdd = events.find(appId = args.appId)(sc)
val jsonStringRdd = eventsRdd.map(write(_))
if (args.format == "json") {
jsonStringRdd.saveAsTextFile(args.outputPath)
} else {
val jsonRdd = sqlContext.jsonRDD(jsonStringRdd)
info(jsonRdd.schemaString)
jsonRdd.saveAsParquetFile(args.outputPath)
}
}
}
}