blob: 6f8aeddeb13ad89ddc9d2c7ea6d880c746e925f5 [file] [log] [blame]
/** Copyright 2015 TappingStone, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prediction.tools.console
import java.io.File
import grizzled.slf4j.Logging
import io.prediction.controller.Utils
import io.prediction.core.BuildInfo
import io.prediction.data.api.EventServer
import io.prediction.data.api.EventServerConfig
import io.prediction.data.storage.EngineManifest
import io.prediction.data.storage.EngineManifestSerializer
import io.prediction.data.storage.Storage
import io.prediction.data.storage.hbase.upgrade.Upgrade_0_8_3
import io.prediction.tools.admin.{AdminServer, AdminServerConfig}
import io.prediction.tools.RegisterEngine
import io.prediction.tools.RunServer
import io.prediction.tools.RunWorkflow
import io.prediction.tools.dashboard.Dashboard
import io.prediction.tools.dashboard.DashboardConfig
import io.prediction.workflow.WorkflowUtils
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.read
import org.json4s.native.Serialization.write
import semverfi._
import scala.collection.JavaConversions._
import scala.io.Source
import scala.sys.process._
import scala.util.Random
import scalaj.http.Http
case class ConsoleArgs(
common: CommonArgs = CommonArgs(),
build: BuildArgs = BuildArgs(),
app: AppArgs = AppArgs(),
accessKey: AccessKeyArgs = AccessKeyArgs(),
deploy: DeployArgs = DeployArgs(),
eventServer: EventServerArgs = EventServerArgs(),
adminServer: AdminServerArgs = AdminServerArgs(),
dashboard: DashboardArgs = DashboardArgs(),
upgrade: UpgradeArgs = UpgradeArgs(),
template: TemplateArgs = TemplateArgs(),
export: ExportArgs = ExportArgs(),
imprt: ImportArgs = ImportArgs(),
commands: Seq[String] = Seq(),
metricsClass: Option[String] = None,
metricsParamsJsonPath: Option[String] = None,
paramsPath: String = "params",
engineInstanceId: Option[String] = None,
mainClass: Option[String] = None)
case class CommonArgs(
batch: String = "",
sparkPassThrough: Seq[String] = Seq(),
driverPassThrough: Seq[String] = Seq(),
pioHome: Option[String] = None,
sparkHome: Option[String] = None,
engineId: Option[String] = None,
engineVersion: Option[String] = None,
engineFactory: Option[String] = None,
engineParamsKey: Option[String] = None,
evaluation: Option[String] = None,
engineParamsGenerator: Option[String] = None,
variantJson: File = new File("engine.json"),
manifestJson: File = new File("manifest.json"),
stopAfterRead: Boolean = false,
stopAfterPrepare: Boolean = false,
skipSanityCheck: Boolean = false,
verbose: Boolean = false,
verbosity: Int = 0,
sparkKryo: Boolean = false)
case class BuildArgs(
sbt: Option[File] = None,
sbtExtra: Option[String] = None,
sbtAssemblyPackageDependency: Boolean = true,
sbtClean: Boolean = false,
uberJar: Boolean = false,
forceGeneratePIOSbt: Boolean = false)
case class DeployArgs(
ip: String = "0.0.0.0",
port: Int = 8000,
logUrl: Option[String] = None,
logPrefix: Option[String] = None)
case class EventServerArgs(
enabled: Boolean = false,
ip: String = "0.0.0.0",
port: Int = 7070,
stats: Boolean = false)
case class AdminServerArgs(
ip: String = "localhost",
port: Int = 7071)
case class DashboardArgs(
ip: String = "0.0.0.0",
port: Int = 9000)
case class UpgradeArgs(
from: String = "0.0.0",
to: String = "0.0.0",
oldAppId: Int = 0,
newAppId: Int = 0
)
object Console extends Logging {
def main(args: Array[String]): Unit = {
val parser = new scopt.OptionParser[ConsoleArgs]("pio") {
override def showUsageOnError: Boolean = false
head("PredictionIO Command Line Interface Console", BuildInfo.version)
help("")
note("Note that it is possible to supply pass-through arguments at\n" +
"the end of the command by using a '--' separator, e.g.\n\n" +
"pio train --params-path params -- --master spark://mycluster:7077\n" +
"\nIn the example above, the '--master' argument will be passed to\n" +
"underlying spark-submit command. Please refer to the usage section\n" +
"for each command for more information.\n\n" +
"The following options are common to all commands:\n")
opt[String]("pio-home") action { (x, c) =>
c.copy(common = c.common.copy(pioHome = Some(x)))
} text("Root directory of a PredictionIO installation.\n" +
" Specify this if automatic discovery fail.")
opt[String]("spark-home") action { (x, c) =>
c.copy(common = c.common.copy(sparkHome = Some(x)))
} text("Root directory of an Apache Spark installation.\n" +
" If not specified, will try to use the SPARK_HOME\n" +
" environmental variable. If this fails as well, default to\n" +
" current directory.")
opt[String]("engine-id") abbr("ei") action { (x, c) =>
c.copy(common = c.common.copy(engineId = Some(x)))
} text("Specify an engine ID. Usually used by distributed deployment.")
opt[String]("engine-version") abbr("ev") action { (x, c) =>
c.copy(common = c.common.copy(engineVersion = Some(x)))
} text("Specify an engine version. Usually used by distributed " +
"deployment.")
opt[File]("variant") abbr("v") action { (x, c) =>
c.copy(common = c.common.copy(variantJson = x))
} validate { x =>
if (x.exists) {
success
} else {
failure(s"${x.getCanonicalPath} does not exist.")
}
} text("Path to an engine variant JSON file. Default: engine.json")
opt[File]("manifest") abbr("m") action { (x, c) =>
c.copy(common = c.common.copy(manifestJson = x))
} validate { x =>
if (x.exists) {
success
} else {
failure(s"${x.getCanonicalPath} does not exist.")
}
} text("Path to an engine manifest JSON file. Default: manifest.json")
opt[File]("sbt") action { (x, c) =>
c.copy(build = c.build.copy(sbt = Some(x)))
} validate { x =>
if (x.exists) {
success
} else {
failure(s"${x.getCanonicalPath} does not exist.")
}
} text("Path to sbt. Default: sbt")
opt[Unit]("verbose") action { (x, c) =>
c.copy(common = c.common.copy(verbose = true))
}
opt[Unit]("spark-kryo") abbr("sk") action { (x, c) =>
c.copy(common = c.common.copy(sparkKryo = true))
}
note("")
cmd("version").
text("Displays the version of this command line console.").
action { (_, c) =>
c.copy(commands = c.commands :+ "version")
}
note("")
cmd("help").action { (_, c) =>
c.copy(commands = c.commands :+ "help")
} children(
arg[String]("<command>") optional()
action { (x, c) =>
c.copy(commands = c.commands :+ x)
}
)
note("")
cmd("build").
text("Build an engine at the current directory.").
action { (_, c) =>
c.copy(commands = c.commands :+ "build")
} children(
opt[String]("sbt-extra") action { (x, c) =>
c.copy(build = c.build.copy(sbtExtra = Some(x)))
} text("Extra command to pass to SBT when it builds your engine."),
opt[Unit]("clean") action { (x, c) =>
c.copy(build = c.build.copy(sbtClean = true))
} text("Clean build."),
opt[Unit]("no-asm") action { (x, c) =>
c.copy(build = c.build.copy(sbtAssemblyPackageDependency = false))
} text("Skip building external dependencies assembly."),
opt[Unit]("uber-jar") action { (x, c) =>
c.copy(build = c.build.copy(uberJar = true))
},
opt[Unit]("generate-pio-sbt") action { (x, c) =>
c.copy(build = c.build.copy(forceGeneratePIOSbt = true))
}
)
note("")
cmd("unregister").
text("Unregister an engine at the current directory.").
action { (_, c) =>
c.copy(commands = c.commands :+ "unregister")
}
note("")
cmd("train").
text("Kick off a training using an engine. This will produce an\n" +
"engine instance. This command will pass all pass-through\n" +
"arguments to its underlying spark-submit command.").
action { (_, c) =>
c.copy(commands = c.commands :+ "train")
} children(
opt[String]("batch") action { (x, c) =>
c.copy(common = c.common.copy(batch = x))
} text("Batch label of the run."),
opt[String]("params-path") action { (x, c) =>
c.copy(paramsPath = x)
} text("Directory to lookup parameters JSON files. Default: params"),
opt[String]("metrics-params") abbr("mp") action { (x, c) =>
c.copy(metricsParamsJsonPath = Some(x))
} text("Metrics parameters JSON file. Will try to use\n" +
" metrics.json in the base path."),
opt[Unit]("skip-sanity-check") abbr("ssc") action { (x, c) =>
c.copy(common = c.common.copy(skipSanityCheck = true))
},
opt[Unit]("stop-after-read") abbr("sar") action { (x, c) =>
c.copy(common = c.common.copy(stopAfterRead = true))
},
opt[Unit]("stop-after-prepare") abbr("sap") action { (x, c) =>
c.copy(common = c.common.copy(stopAfterPrepare = true))
},
opt[Unit]("uber-jar") action { (x, c) =>
c.copy(build = c.build.copy(uberJar = true))
},
opt[Int]("verbosity") action { (x, c) =>
c.copy(common = c.common.copy(verbosity = x))
},
opt[String]("engine-factory") action { (x, c) =>
c.copy(common = c.common.copy(engineFactory = Some(x)))
},
opt[String]("engine-params-key") action { (x, c) =>
c.copy(common = c.common.copy(engineParamsKey = Some(x)))
}
)
note("")
cmd("eval").
text("Kick off an evaluation using an engine. This will produce an\n" +
"engine instance. This command will pass all pass-through\n" +
"arguments to its underlying spark-submit command.").
action { (_, c) =>
c.copy(commands = c.commands :+ "eval")
} children(
arg[String]("<evaluation-class>") action { (x, c) =>
c.copy(common = c.common.copy(evaluation = Some(x)))
},
arg[String]("[<engine-parameters-generator-class>]") optional() action { (x, c) =>
c.copy(common = c.common.copy(engineParamsGenerator = Some(x)))
} text("Optional engine parameters generator class, overriding the first argument"),
opt[String]("batch") action { (x, c) =>
c.copy(common = c.common.copy(batch = x))
} text("Batch label of the run.")
)
note("")
cmd("deploy").
text("Deploy an engine instance as a prediction server. This\n" +
"command will pass all pass-through arguments to its underlying\n" +
"spark-submit command.").
action { (_, c) =>
c.copy(commands = c.commands :+ "deploy")
} children(
opt[String]("batch") action { (x, c) =>
c.copy(common = c.common.copy(batch = x))
} text("Batch label of the deployment."),
opt[String]("engine-instance-id") action { (x, c) =>
c.copy(engineInstanceId = Some(x))
} text("Engine instance ID."),
opt[String]("ip") action { (x, c) =>
c.copy(deploy = c.deploy.copy(ip = x))
},
opt[Int]("port") action { (x, c) =>
c.copy(deploy = c.deploy.copy(port = x))
} text("Port to bind to. Default: 8000"),
opt[Unit]("feedback") action { (_, c) =>
c.copy(eventServer = c.eventServer.copy(enabled = true))
} text("Enable feedback loop to event server."),
opt[String]("event-server-ip") action { (x, c) =>
c.copy(eventServer = c.eventServer.copy(ip = x))
},
opt[Int]("event-server-port") action { (x, c) =>
c.copy(eventServer = c.eventServer.copy(port = x))
} text("Event server port. Default: 7070"),
opt[Int]("admin-server-port") action { (x, c) =>
c.copy(adminServer = c.adminServer.copy(port = x))
} text("Admin server port. Default: 7071"),
opt[String]("admin-server-port") action { (x, c) =>
c.copy(adminServer = c.adminServer.copy(ip = x))
} text("Admin server IP. Default: localhost"),
opt[String]("accesskey") action { (x, c) =>
c.copy(accessKey = c.accessKey.copy(accessKey = x))
} text("Access key of the App where feedback data will be stored."),
opt[Unit]("uber-jar") action { (x, c) =>
c.copy(build = c.build.copy(uberJar = true))
},
opt[String]("log-url") action { (x, c) =>
c.copy(deploy = c.deploy.copy(logUrl = Some(x)))
},
opt[String]("log-prefix") action { (x, c) =>
c.copy(deploy = c.deploy.copy(logPrefix = Some(x)))
}
)
note("")
cmd("undeploy").
text("Undeploy an engine instance as a prediction server.").
action { (_, c) =>
c.copy(commands = c.commands :+ "undeploy")
} children(
opt[String]("ip") action { (x, c) =>
c.copy(deploy = c.deploy.copy(ip = x))
},
opt[Int]("port") action { (x, c) =>
c.copy(deploy = c.deploy.copy(port = x))
} text("Port to unbind from. Default: 8000")
)
note("")
cmd("dashboard").
text("Launch a dashboard at the specific IP and port.").
action { (_, c) =>
c.copy(commands = c.commands :+ "dashboard")
} children(
opt[String]("ip") action { (x, c) =>
c.copy(dashboard = c.dashboard.copy(ip = x))
},
opt[Int]("port") action { (x, c) =>
c.copy(dashboard = c.dashboard.copy(port = x))
} text("Port to bind to. Default: 9000")
)
note("")
cmd("eventserver").
text("Launch an Event Server at the specific IP and port.").
action { (_, c) =>
c.copy(commands = c.commands :+ "eventserver")
} children(
opt[String]("ip") action { (x, c) =>
c.copy(eventServer = c.eventServer.copy(ip = x))
},
opt[Int]("port") action { (x, c) =>
c.copy(eventServer = c.eventServer.copy(port = x))
} text("Port to bind to. Default: 7070"),
opt[Unit]("stats") action { (x, c) =>
c.copy(eventServer = c.eventServer.copy(stats = true))
}
)
cmd("adminserver").
text("Launch an Admin Server at the specific IP and port.").
action { (_, c) =>
c.copy(commands = c.commands :+ "adminserver")
} children(
opt[String]("ip") action { (x, c) =>
c.copy(adminServer = c.adminServer.copy(ip = x))
} text("IP to bind to. Default: localhost"),
opt[Int]("port") action { (x, c) =>
c.copy(adminServer = c.adminServer.copy(port = x))
} text("Port to bind to. Default: 7071")
)
note("")
cmd("run").
text("Launch a driver program. This command will pass all\n" +
"pass-through arguments to its underlying spark-submit command.\n" +
"In addition, it also supports a second level of pass-through\n" +
"arguments to the driver program, e.g.\n" +
"pio run -- --master spark://localhost:7077 -- --driver-arg foo").
action { (_, c) =>
c.copy(commands = c.commands :+ "run")
} children(
arg[String]("<main class>") action { (x, c) =>
c.copy(mainClass = Some(x))
} text("Main class name of the driver program."),
opt[String]("sbt-extra") action { (x, c) =>
c.copy(build = c.build.copy(sbtExtra = Some(x)))
} text("Extra command to pass to SBT when it builds your engine."),
opt[Unit]("clean") action { (x, c) =>
c.copy(build = c.build.copy(sbtClean = true))
} text("Clean build."),
opt[Unit]("no-asm") action { (x, c) =>
c.copy(build = c.build.copy(sbtAssemblyPackageDependency = false))
} text("Skip building external dependencies assembly.")
)
note("")
cmd("status").
text("Displays status information about the PredictionIO system.").
action { (_, c) =>
c.copy(commands = c.commands :+ "status")
}
note("")
cmd("upgrade").
text("Upgrade tool").
action { (_, c) =>
c.copy(commands = c.commands :+ "upgrade")
} children(
arg[String]("<from version>") action { (x, c) =>
c.copy(upgrade = c.upgrade.copy(from = x))
} text("The version upgraded from."),
arg[String]("<to version>") action { (x, c) =>
c.copy(upgrade = c.upgrade.copy(to = x))
} text("The version upgraded to."),
arg[Int]("<old App ID>") action { (x, c) =>
c.copy(upgrade = c.upgrade.copy(oldAppId = x))
} text("Old App ID."),
arg[Int]("<new App ID>") action { (x, c) =>
c.copy(upgrade = c.upgrade.copy(newAppId = x))
} text("New App ID.")
)
note("")
cmd("app").
text("Manage apps.\n").
action { (_, c) =>
c.copy(commands = c.commands :+ "app")
} children(
cmd("new").
text("Create a new app key to app ID mapping.").
action { (_, c) =>
c.copy(commands = c.commands :+ "new")
} children(
opt[Int]("id") action { (x, c) =>
c.copy(app = c.app.copy(id = Some(x)))
} text("Specify this if you already have data under an app ID."),
opt[String]("description") action { (x, c) =>
c.copy(app = c.app.copy(description = Some(x)))
} text("Specify this if you already have data under an app ID."),
arg[String]("<name>") action { (x, c) =>
c.copy(app = c.app.copy(name = x))
} text("App name.")
),
note(""),
cmd("list").
text("List all apps.").
action { (_, c) =>
c.copy(commands = c.commands :+ "list")
},
note(""),
cmd("show").
text("Show details of an app.").
action { (_, c) =>
c.copy(commands = c.commands :+ "show")
} children (
arg[String]("<name>") action { (x, c) =>
c.copy(app = c.app.copy(name = x))
} text("Name of the app to be shown.")
),
note(""),
cmd("delete").
text("Delete an app.").
action { (_, c) =>
c.copy(commands = c.commands :+ "delete")
} children(
arg[String]("<name>") action { (x, c) =>
c.copy(app = c.app.copy(name = x))
} text("Name of the app to be deleted.")
),
note(""),
cmd("data-delete").
text("Delete data of an app").
action { (_, c) =>
c.copy(commands = c.commands :+ "data-delete")
} children(
arg[String]("<name>") action { (x, c) =>
c.copy(app = c.app.copy(name = x))
} text("Name of the app whose data to be deleted."),
opt[String]("channel") action { (x, c) =>
c.copy(app = c.app.copy(dataDeleteChannel = Some(x)))
} text("Name of channel whose data to be deleted."),
opt[Unit]("all") action { (x, c) =>
c.copy(app = c.app.copy(all = true))
} text("Delete data of all channels including default")
),
note(""),
cmd("channel-new").
text("Create a new channel for the app.").
action { (_, c) =>
c.copy(commands = c.commands :+ "channel-new")
} children (
arg[String]("<name>") action { (x, c) =>
c.copy(app = c.app.copy(name = x))
} text("App name."),
arg[String]("<channel>") action { (x, c) =>
c.copy(app = c.app.copy(channel = x))
} text ("Channel name to be created.")
),
note(""),
cmd("channel-delete").
text("Delete a channel of the app.").
action { (_, c) =>
c.copy(commands = c.commands :+ "channel-delete")
} children (
arg[String]("<name>") action { (x, c) =>
c.copy(app = c.app.copy(name = x))
} text("App name."),
arg[String]("<channel>") action { (x, c) =>
c.copy(app = c.app.copy(channel = x))
} text ("Channel name to be deleted.")
)
)
note("")
cmd("accesskey").
text("Manage app access keys.\n").
action { (_, c) =>
c.copy(commands = c.commands :+ "accesskey")
} children(
cmd("new").
text("Add allowed event(s) to an access key.").
action { (_, c) =>
c.copy(commands = c.commands :+ "new")
} children(
arg[String]("<app name>") action { (x, c) =>
c.copy(app = c.app.copy(name = x))
} text("App to be associated with the new access key."),
arg[String]("[<event1> <event2> ...]") unbounded() optional()
action { (x, c) =>
c.copy(accessKey = c.accessKey.copy(
events = c.accessKey.events :+ x))
} text("Allowed event name(s) to be added to the access key.")
),
cmd("list").
text("List all access keys of an app.").
action { (_, c) =>
c.copy(commands = c.commands :+ "list")
} children(
arg[String]("<app name>") optional() action { (x, c) =>
c.copy(app = c.app.copy(name = x))
} text("App name.")
),
note(""),
cmd("delete").
text("Delete an access key.").
action { (_, c) =>
c.copy(commands = c.commands :+ "delete")
} children(
arg[String]("<access key>") action { (x, c) =>
c.copy(accessKey = c.accessKey.copy(accessKey = x))
} text("The access key to be deleted.")
)
)
cmd("template").
action { (_, c) =>
c.copy(commands = c.commands :+ "template")
} children(
cmd("get").
action { (_, c) =>
c.copy(commands = c.commands :+ "get")
} children(
arg[String]("<template ID>") required() action { (x, c) =>
c.copy(template = c.template.copy(repository = x))
},
arg[String]("<new engine directory>") action { (x, c) =>
c.copy(template = c.template.copy(directory = x))
},
opt[String]("version") action { (x, c) =>
c.copy(template = c.template.copy(version = Some(x)))
},
opt[String]("name") action { (x, c) =>
c.copy(template = c.template.copy(name = Some(x)))
},
opt[String]("package") action { (x, c) =>
c.copy(template = c.template.copy(packageName = Some(x)))
},
opt[String]("email") action { (x, c) =>
c.copy(template = c.template.copy(email = Some(x)))
}
),
cmd("list").
action { (_, c) =>
c.copy(commands = c.commands :+ "list")
}
)
cmd("export").
action { (_, c) =>
c.copy(commands = c.commands :+ "export")
} children(
opt[Int]("appid") required() action { (x, c) =>
c.copy(export = c.export.copy(appId = x))
},
opt[String]("output") required() action { (x, c) =>
c.copy(export = c.export.copy(outputPath = x))
},
opt[String]("format") action { (x, c) =>
c.copy(export = c.export.copy(format = x))
},
opt[String]("channel") action { (x, c) =>
c.copy(export = c.export.copy(channel = Some(x)))
}
)
cmd("import").
action { (_, c) =>
c.copy(commands = c.commands :+ "import")
} children(
opt[Int]("appid") required() action { (x, c) =>
c.copy(imprt = c.imprt.copy(appId = x))
},
opt[String]("input") required() action { (x, c) =>
c.copy(imprt = c.imprt.copy(inputPath = x))
},
opt[String]("channel") action { (x, c) =>
c.copy(imprt = c.imprt.copy(channel = Some(x)))
}
)
}
val separatorIndex = args.indexWhere(_ == "--")
val (consoleArgs, theRest) =
if (separatorIndex == -1) {
(args, Array[String]())
} else {
args.splitAt(separatorIndex)
}
val allPassThroughArgs = theRest.drop(1)
val secondSepIdx = allPassThroughArgs.indexWhere(_ == "--")
val (sparkPassThroughArgs, driverPassThroughArgs) =
if (secondSepIdx == -1) {
(allPassThroughArgs, Array[String]())
} else {
val t = allPassThroughArgs.splitAt(secondSepIdx)
(t._1, t._2.drop(1))
}
parser.parse(consoleArgs, ConsoleArgs()) map { pca =>
val ca = pca.copy(common = pca.common.copy(
sparkPassThrough = sparkPassThroughArgs,
driverPassThrough = driverPassThroughArgs))
WorkflowUtils.modifyLogging(ca.common.verbose)
val rv: Int = ca.commands match {
case Seq("") =>
System.err.println(help())
1
case Seq("version") =>
version(ca)
0
case Seq("build") =>
regenerateManifestJson(ca.common.manifestJson)
build(ca)
case Seq("unregister") =>
unregister(ca)
0
case Seq("train") =>
regenerateManifestJson(ca.common.manifestJson)
train(ca)
case Seq("eval") =>
regenerateManifestJson(ca.common.manifestJson)
train(ca)
case Seq("deploy") =>
deploy(ca)
case Seq("undeploy") =>
undeploy(ca)
case Seq("dashboard") =>
dashboard(ca)
0
case Seq("eventserver") =>
eventserver(ca)
0
case Seq("adminserver") =>
adminserver(ca)
0
case Seq("run") =>
generateManifestJson(ca.common.manifestJson)
run(ca)
case Seq("status") =>
status(ca)
case Seq("upgrade") =>
upgrade(ca)
0
case Seq("app", "new") =>
App.create(ca)
case Seq("app", "list") =>
App.list(ca)
case Seq("app", "show") =>
App.show(ca)
case Seq("app", "delete") =>
App.delete(ca)
case Seq("app", "data-delete") =>
App.dataDelete(ca)
case Seq("app", "channel-new") =>
App.channelNew(ca)
case Seq("app", "channel-delete") =>
App.channelDelete(ca)
case Seq("accesskey", "new") =>
AccessKey.create(ca)
case Seq("accesskey", "list") =>
AccessKey.list(ca)
case Seq("accesskey", "delete") =>
AccessKey.delete(ca)
case Seq("template", "get") =>
Template.get(ca)
case Seq("template", "list") =>
Template.list(ca)
case Seq("export") =>
Export.eventsToFile(ca, coreAssembly(ca.common.pioHome.get))
case Seq("import") =>
Import.fileToEvents(ca, coreAssembly(ca.common.pioHome.get))
case _ =>
System.err.println(help(ca.commands))
1
}
sys.exit(rv)
} getOrElse {
val command = args.toSeq.filterNot(_.startsWith("--")).head
System.err.println(help(Seq(command)))
sys.exit(1)
}
}
def help(commands: Seq[String] = Seq()): String = {
if (commands.isEmpty) {
mainHelp
} else {
val stripped =
(if (commands.head == "help") commands.drop(1) else commands).
mkString("-")
helpText.getOrElse(stripped, s"Help is unavailable for ${stripped}.")
}
}
val mainHelp = txt.main().toString
val helpText = Map(
"" -> mainHelp,
"status" -> txt.status().toString,
"upgrade" -> txt.upgrade().toString,
"version" -> txt.version().toString,
"template" -> txt.template().toString,
"build" -> txt.build().toString,
"train" -> txt.train().toString,
"deploy" -> txt.deploy().toString,
"eventserver" -> txt.eventserver().toString,
"adminserver" -> txt.adminserver().toString,
"app" -> txt.app().toString,
"accesskey" -> txt.accesskey().toString,
"import" -> txt.imprt().toString,
"export" -> txt.export().toString,
"run" -> txt.run().toString,
"eval" -> txt.eval().toString,
"dashboard" -> txt.dashboard().toString)
def version(ca: ConsoleArgs): Unit = println(BuildInfo.version)
def build(ca: ConsoleArgs): Int = {
Template.verifyTemplateMinVersion(new File("template.json"))
compile(ca)
info("Looking for an engine...")
val jarFiles = jarFilesForScala
if (jarFiles.size == 0) {
error("No engine found. Your build might have failed. Aborting.")
return 1
}
jarFiles foreach { f => info(s"Found ${f.getName}")}
val copyLocal = if (sys.env.contains("HADOOP_CONF_DIR")) {
info("HADOOP_CONF_DIR is set. Assuming HDFS is available.")
true
} else {
info("HADOOP_CONF_DIR is not set. Assuming HDFS is unavailable.")
false
}
val finalJarFiles = if (sys.env.contains("HADOOP_CONF_DIR") && !ca.build.uberJar) {
info("Also copying PredictionIO core assembly.")
jarFiles :+ coreAssembly(ca.common.pioHome.get)
} else {
jarFiles
}
RegisterEngine.registerEngine(
ca.common.manifestJson,
finalJarFiles,
copyLocal)
info("Your engine is ready for training.")
0
}
def unregister(ca: ConsoleArgs): Unit = {
RegisterEngine.unregisterEngine(ca.common.manifestJson)
}
def train(ca: ConsoleArgs): Int = {
Template.verifyTemplateMinVersion(new File("template.json"))
withRegisteredManifest(
ca.common.manifestJson,
ca.common.engineId,
ca.common.engineVersion) { em =>
if (ca.build.uberJar) {
val uniqueJars =
em.files.map(_.split(Path.SEPARATOR_CHAR).last).groupBy(identity).keys
if (uniqueJars.size > 1) {
error("Uber JAR mode cannot be turned on when current build produced " +
"more than 1 engine JAR files. Aborting.")
return 1
}
}
RunWorkflow.runWorkflow(
ca,
coreAssembly(ca.common.pioHome.get),
em,
ca.common.variantJson)
}
}
def deploy(ca: ConsoleArgs): Int = {
Template.verifyTemplateMinVersion(new File("template.json"))
withRegisteredManifest(
ca.common.manifestJson,
ca.common.engineId,
ca.common.engineVersion) { em =>
val variantJson = parse(Source.fromFile(ca.common.variantJson).mkString)
val variantId = variantJson \ "id" match {
case JString(s) => s
case _ =>
error("Unable to read engine variant ID from " +
s"${ca.common.variantJson.getCanonicalPath}. Aborting.")
return 1
}
val engineInstances = Storage.getMetaDataEngineInstances
val engineInstance = ca.engineInstanceId map { eid =>
engineInstances.get(eid)
} getOrElse {
engineInstances.getLatestCompleted(em.id, em.version, variantId)
}
engineInstance map { r =>
// undeploy(ca)
RunServer.runServer(
ca,
coreAssembly(ca.common.pioHome.get),
em,
r.id)
} getOrElse {
ca.engineInstanceId map { eid =>
error(
s"Invalid engine instance ID ${ca.engineInstanceId}. Aborting.")
} getOrElse {
error(
s"No valid engine instance found for engine ${em.id} " +
s"${em.version}.\nTry running 'train' before 'deploy'. Aborting.")
}
1
}
}
}
def dashboard(ca: ConsoleArgs): Unit = {
info(s"Creating dashboard at ${ca.dashboard.ip}:${ca.dashboard.port}")
Dashboard.createDashboard(DashboardConfig(
ip = ca.dashboard.ip,
port = ca.dashboard.port))
}
def eventserver(ca: ConsoleArgs): Unit = {
info(
s"Creating Event Server at ${ca.eventServer.ip}:${ca.eventServer.port}")
EventServer.createEventServer(EventServerConfig(
ip = ca.eventServer.ip,
port = ca.eventServer.port,
stats = ca.eventServer.stats))
}
def adminserver(ca: ConsoleArgs): Unit = {
info(
s"Creating Admin Server at ${ca.adminServer.ip}:${ca.adminServer.port}")
AdminServer.createAdminServer(AdminServerConfig(
ip = ca.adminServer.ip,
port = ca.adminServer.port
))
}
def undeploy(ca: ConsoleArgs): Int = {
val serverUrl = s"http://${ca.deploy.ip}:${ca.deploy.port}"
info(
s"Undeploying any existing engine instance at ${serverUrl}")
try {
val code = Http(s"${serverUrl}/stop").asString.code
code match {
case 200 => 0
case 404 =>
error(s"Another process is using ${serverUrl}. Unable to undeploy.")
1
case _ =>
error(s"Another process is using ${serverUrl}, or an existing " +
s"engine server is not responding properly (HTTP ${code}). " +
"Unable to undeploy.")
1
}
} catch {
case e: java.net.ConnectException =>
warn(s"Nothing at ${serverUrl}")
0
case _: Throwable =>
error("Another process might be occupying " +
s"${ca.deploy.ip}:${ca.deploy.port}. Unable to undeploy.")
1
}
}
def compile(ca: ConsoleArgs): Unit = {
// only add pioVersion to sbt if project/pio.sbt exists
if (new File("project", "pio-build.sbt").exists || ca.build.forceGeneratePIOSbt) {
FileUtils.writeLines(
new File("pio.sbt"),
Seq(
"// Generated automatically by pio build.",
"// Changes in this file will be overridden.",
"",
"pioVersion := \"" + BuildInfo.version + "\""))
}
implicit val formats = Utils.json4sDefaultFormats
try {
val engineFactory =
(parse(Source.fromFile("engine.json").mkString) \ "engineFactory").
extract[String]
WorkflowUtils.checkUpgrade("build", engineFactory)
} catch {
case e: Throwable => WorkflowUtils.checkUpgrade("build")
}
val sbt = detectSbt(ca)
info(s"Using command '${sbt}' at the current working directory to build.")
info("If the path above is incorrect, this process will fail.")
val asm =
if (ca.build.sbtAssemblyPackageDependency) {
" assemblyPackageDependency"
} else {
""
}
val clean = if (ca.build.sbtClean) " clean" else ""
val buildCmd = s"${sbt} ${ca.build.sbtExtra.getOrElse("")}${clean} " +
(if (ca.build.uberJar) "assembly" else s"package${asm}")
val core = new File(s"pio-assembly-${BuildInfo.version}.jar")
if (ca.build.uberJar) {
info(s"Uber JAR enabled. Putting ${core.getName} in lib.")
val dst = new File("lib")
dst.mkdir()
FileUtils.copyFileToDirectory(
coreAssembly(ca.common.pioHome.get),
dst,
true)
} else {
info(s"Uber JAR disabled. Making sure lib/${core.getName} is absent.")
new File("lib", core.getName).delete()
}
info(s"Going to run: ${buildCmd}")
try {
val r =
if (ca.common.verbose) {
buildCmd.!(ProcessLogger(line => info(line), line => error(line)))
} else {
buildCmd.!(ProcessLogger(
line => outputSbtError(line),
line => outputSbtError(line)))
}
if (r != 0) {
error(s"Return code of previous step is ${r}. Aborting.")
sys.exit(1)
}
info("Build finished successfully.")
} catch {
case e: java.io.IOException =>
error(s"${e.getMessage}")
sys.exit(1)
}
}
private def outputSbtError(line: String): Unit = {
"""\[.*error.*\]""".r findFirstIn line foreach { _ => error(line) }
}
def run(ca: ConsoleArgs): Int = {
compile(ca)
val extraFiles = WorkflowUtils.thirdPartyConfFiles
val jarFiles = jarFilesForScala
jarFiles foreach { f => info(s"Found JAR: ${f.getName}") }
val allJarFiles = jarFiles.map(_.getCanonicalPath)
val cmd = s"${getSparkHome(ca.common.sparkHome)}/bin/spark-submit --jars " +
s"${allJarFiles.mkString(",")} " +
(if (extraFiles.size > 0) {
s"--files ${extraFiles.mkString(",")} "
} else {
""
}) +
"--class " +
s"${ca.mainClass.get} ${ca.common.sparkPassThrough.mkString(" ")} " +
coreAssembly(ca.common.pioHome.get) + " " +
ca.common.driverPassThrough.mkString(" ")
val proc = Process(
cmd,
None,
"SPARK_YARN_USER_ENV" -> sys.env.filter(kv => kv._1.startsWith("PIO_")).
map(kv => s"${kv._1}=${kv._2}").mkString(","))
info(s"Submission command: ${cmd}")
val r = proc.!
if (r != 0) {
error(s"Return code of previous step is ${r}. Aborting.")
return 1
}
r
}
def status(ca: ConsoleArgs): Int = {
println("PredictionIO")
ca.common.pioHome map { pioHome =>
println(s" Installed at: ${pioHome}")
println(s" Version: ${BuildInfo.version}")
} getOrElse {
println("Unable to locate PredictionIO installation. Aborting.")
return 1
}
println("")
val sparkHome = getSparkHome(ca.common.sparkHome)
if (new File(s"${sparkHome}/bin/spark-submit").exists) {
println(s"Apache Spark")
println(s" Installed at: ${sparkHome}")
val sparkMinVersion = "1.3.0"
val sparkReleaseFile = new File(s"${sparkHome}/RELEASE")
if (sparkReleaseFile.exists) {
val sparkReleaseStrings =
Source.fromFile(sparkReleaseFile).mkString.split(' ')
val sparkReleaseVersion = sparkReleaseStrings(1)
val parsedMinVersion = Version.apply(sparkMinVersion)
val parsedCurrentVersion = Version.apply(sparkReleaseVersion)
if (parsedCurrentVersion >= parsedMinVersion) {
println(s" Version: ${sparkReleaseVersion} (meets minimum " +
s"requirement of ${sparkMinVersion})")
} else {
println(" Version: ${sparkReleaseVersion}")
println("Apache Spark version does not meet minimum requirement. " +
"Aborting.")
}
} else {
println(" Version information cannot be found.")
println(" If you are using a developmental tree, please make sure")
println(s" you are using a version of at least ${sparkMinVersion}.")
}
} else {
println("Unable to locate a proper Apache Spark installation. Aborting.")
return 1
}
println("")
println("Storage Backend Connections")
try {
Storage.verifyAllDataObjects()
} catch {
case e: Throwable =>
e.printStackTrace
println("")
println("Unable to connect to all storage backend(s) successfully. " +
"Please refer to error message(s) above. Aborting.")
return 1
}
println("")
println("(sleeping 5 seconds for all messages to show up...)")
Thread.sleep(5000)
println("Your system is all ready to go.")
0
}
def upgrade(ca: ConsoleArgs): Unit = {
(ca.upgrade.from, ca.upgrade.to) match {
case ("0.8.2", "0.8.3") => {
Upgrade_0_8_3.runMain(ca.upgrade.oldAppId, ca.upgrade.newAppId)
}
case _ =>
println(s"Upgrade from version ${ca.upgrade.from} to ${ca.upgrade.to}"
+ s" is not supported.")
}
}
def coreAssembly(pioHome: String): File = {
val core = s"pio-assembly-${BuildInfo.version}.jar"
val coreDir =
if (new File(pioHome + File.separator + "RELEASE").exists) {
new File(pioHome + File.separator + "lib")
} else {
new File(pioHome + File.separator + "assembly")
}
val coreFile = new File(coreDir, core)
if (coreFile.exists) {
coreFile
} else {
error(s"PredictionIO Core Assembly (${coreFile.getCanonicalPath}) does " +
"not exist. Aborting.")
sys.exit(1)
}
}
val manifestAutogenTag = "pio-autogen-manifest"
def regenerateManifestJson(json: File): Unit = {
val cwd = sys.props("user.dir")
val ha = java.security.MessageDigest.getInstance("SHA-1").
digest(cwd.getBytes).map("%02x".format(_)).mkString
if (json.exists) {
val em = readManifestJson(json)
if (em.description == Some(manifestAutogenTag) && ha != em.version) {
warn("This engine project directory contains an auto-generated " +
"manifest that has been copied/moved from another location. ")
warn("Regenerating the manifest to reflect the updated location. " +
"This will dissociate with all previous engine instances.")
generateManifestJson(json)
} else {
info(s"Using existing engine manifest JSON at ${json.getCanonicalPath}")
}
} else {
generateManifestJson(json)
}
}
def generateManifestJson(json: File): Unit = {
val cwd = sys.props("user.dir")
implicit val formats = Utils.json4sDefaultFormats +
new EngineManifestSerializer
val rand = Random.alphanumeric.take(32).mkString
val ha = java.security.MessageDigest.getInstance("SHA-1").
digest(cwd.getBytes).map("%02x".format(_)).mkString
val em = EngineManifest(
id = rand,
version = ha,
name = new File(cwd).getName,
description = Some(manifestAutogenTag),
files = Seq(),
engineFactory = "")
try {
FileUtils.writeStringToFile(json, write(em), "ISO-8859-1")
} catch {
case e: java.io.IOException =>
error(s"Cannot generate ${json} automatically (${e.getMessage}). " +
"Aborting.")
sys.exit(1)
}
}
def readManifestJson(json: File): EngineManifest = {
implicit val formats = Utils.json4sDefaultFormats +
new EngineManifestSerializer
try {
read[EngineManifest](Source.fromFile(json).mkString)
} catch {
case e: java.io.FileNotFoundException =>
error(s"${json.getCanonicalPath} does not exist. Aborting.")
sys.exit(1)
case e: MappingException =>
error(s"${json.getCanonicalPath} has invalid content: " +
e.getMessage)
sys.exit(1)
}
}
def withRegisteredManifest(
json: File,
engineId: Option[String],
engineVersion: Option[String])(
op: EngineManifest => Int): Int = {
val ej = readManifestJson(json)
val id = engineId getOrElse ej.id
val version = engineVersion getOrElse ej.version
Storage.getMetaDataEngineManifests.get(id, version) map {
op
} getOrElse {
error(s"Engine ${id} ${version} cannot be found in the system.")
error("Possible reasons:")
error("- the engine is not yet built by the 'build' command;")
error("- the meta data store is offline.")
1
}
}
def jarFilesAt(path: File): Array[File] = recursiveListFiles(path) filter {
_.getName.toLowerCase.endsWith(".jar")
}
def jarFilesForScala: Array[File] = {
val libFiles = jarFilesForScalaFilter(jarFilesAt(new File("lib")))
val targetFiles = jarFilesForScalaFilter(jarFilesAt(new File("target" +
File.separator + s"scala-${scalaVersionNoPatch}")))
// Use libFiles is target is empty.
if (targetFiles.size > 0) targetFiles else libFiles
}
def jarFilesForScalaFilter(jars: Array[File]): Array[File] =
jars.filterNot { f =>
f.getName.toLowerCase.endsWith("-javadoc.jar") ||
f.getName.toLowerCase.endsWith("-sources.jar")
}
def recursiveListFiles(f: File): Array[File] = {
Option(f.listFiles) map { these =>
these ++ these.filter(_.isDirectory).flatMap(recursiveListFiles)
} getOrElse Array[File]()
}
def getSparkHome(sparkHome: Option[String]): String = {
sparkHome getOrElse {
sys.env.get("SPARK_HOME").getOrElse(".")
}
}
def versionNoPatch(fullVersion: String): String = {
val v = """^(\d+\.\d+)""".r
val versionNoPatch = for {
v(np) <- v findFirstIn fullVersion
} yield np
versionNoPatch.getOrElse(fullVersion)
}
def scalaVersionNoPatch: String = versionNoPatch(BuildInfo.scalaVersion)
def detectSbt(ca: ConsoleArgs): String = {
ca.build.sbt map {
_.getCanonicalPath
} getOrElse {
val f = new File(Seq(ca.common.pioHome.get, "sbt", "sbt").mkString(
File.separator))
if (f.exists) f.getCanonicalPath else "sbt"
}
}
}