blob: f3c98d659706c21eb75dc2b4eb527103d30c2562 [file] [log] [blame]
/** Copyright 2015 TappingStone, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prediction.tools.imprt
import io.prediction.controller.Utils
import io.prediction.data.storage.Event
import io.prediction.data.storage.EventJson4sSupport
import io.prediction.data.storage.Storage
import io.prediction.tools.Runner
import io.prediction.workflow.WorkflowContext
import io.prediction.workflow.WorkflowUtils
import grizzled.slf4j.Logging
import org.json4s.native.Serialization._
case class FileToEventsArgs(
env: String = "",
logFile: String = "",
appId: Int = 0,
channel: Option[String] = None,
inputPath: String = "",
verbose: Boolean = false,
debug: Boolean = false)
object FileToEvents extends Logging {
def main(args: Array[String]): Unit = {
val parser = new scopt.OptionParser[FileToEventsArgs]("FileToEvents") {
opt[String]("env") action { (x, c) =>
c.copy(env = x)
}
opt[String]("log-file") action { (x, c) =>
c.copy(logFile = x)
}
opt[Int]("appid") action { (x, c) =>
c.copy(appId = x)
}
opt[String]("channel") action { (x, c) =>
c.copy(channel = Some(x))
}
opt[String]("input") action { (x, c) =>
c.copy(inputPath = x)
}
opt[Unit]("verbose") action { (x, c) =>
c.copy(verbose = true)
}
opt[Unit]("debug") action { (x, c) =>
c.copy(debug = true)
}
}
parser.parse(args, FileToEventsArgs()) map { args =>
// get channelId
val channels = Storage.getMetaDataChannels
val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap
val channelId: Option[Int] = args.channel.map { ch =>
if (!channelMap.contains(ch)) {
error(s"Channel ${ch} doesn't exist in this app.")
sys.exit(1)
}
channelMap(ch)
}
val channelStr = args.channel.map(n => " Channel " + n).getOrElse("")
WorkflowUtils.modifyLogging(verbose = args.verbose)
@transient lazy implicit val formats = Utils.json4sDefaultFormats +
new EventJson4sSupport.APISerializer
val sc = WorkflowContext(
mode = "Import",
batch = "App ID " + args.appId + channelStr,
executorEnv = Runner.envStringToMap(args.env))
val rdd = sc.textFile(args.inputPath)
val events = Storage.getPEvents()
events.write(events = rdd.map(read[Event](_)),
appId = args.appId,
channelId = channelId)(sc)
info("Events are imported.")
info("Done.")
}
}
}