blob: 446b0dead19f5728dc2a94437c65eb648d3142be [file] [log] [blame]
/** Copyright 2014 TappingStone, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prediction.workflow
import io.prediction.controller.EmptyParams
import io.prediction.controller.EngineParams
import io.prediction.controller.IEngineFactory
import io.prediction.controller.Metrics
import io.prediction.controller.Params
import io.prediction.controller.Utils
import io.prediction.controller.WorkflowParams
import io.prediction.core.Doer
import io.prediction.core.BaseMetrics
import io.prediction.data.storage.EngineInstance
import io.prediction.data.storage.Storage
import com.github.nscala_time.time.Imports._
import com.google.common.io.ByteStreams
import grizzled.slf4j.Logging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.{ read, write }
import scala.language.existentials
import scala.reflect.Manifest
import scala.reflect.runtime.universe
import java.io.File
object CreateWorkflow extends Logging {
case class WorkflowConfig(
batch: String = "Transient Lazy Val",
engineId: String = "",
engineVersion: String = "",
engineFactory: String = "",
metricsClass: Option[String] = None,
dataSourceParamsJsonPath: Option[String] = None,
preparatorParamsJsonPath: Option[String] = None,
algorithmsParamsJsonPath: Option[String] = None,
servingParamsJsonPath: Option[String] = None,
metricsParamsJsonPath: Option[String] = None,
jsonBasePath: String = "",
env: Option[String] = None)
case class AlgorithmParams(name: String, params: JValue)
implicit lazy val formats = Utils.json4sDefaultFormats
val hadoopConf = new Configuration
val hdfs = FileSystem.get(hadoopConf)
private def stringFromFile(basePath: String, filePath: String): String = {
try {
val p =
if (basePath == "")
new Path(filePath)
else
new Path(basePath + Path.SEPARATOR + filePath)
new String(ByteStreams.toByteArray(hdfs.open(p)).map(_.toChar))
} catch {
case e: java.io.IOException =>
error(s"Error reading from file: ${e.getMessage}. Aborting workflow.")
sys.exit(1)
}
}
def main(args: Array[String]): Unit = {
val parser = new scopt.OptionParser[WorkflowConfig]("CreateWorkflow") {
opt[String]("batch") action { (x, c) =>
c.copy(batch = x)
} text("Batch label of the workflow run.")
opt[String]("engineId") required() action { (x, c) =>
c.copy(engineId = x)
} text("Engine's ID.")
opt[String]("engineVersion") required() action { (x, c) =>
c.copy(engineVersion = x)
} text("Engine's version.")
opt[String]("engineFactory") required() action { (x, c) =>
c.copy(engineFactory = x)
} text("Class name of the engine's factory.")
opt[String]("metricsClass") action { (x, c) =>
c.copy(metricsClass = Some(x))
} text("Class name of the run's metrics.")
opt[String]("dsp") action { (x, c) =>
c.copy(dataSourceParamsJsonPath = Some(x))
} text("Path to data source parameters JSON file.")
opt[String]("pp") action { (x, c) =>
c.copy(preparatorParamsJsonPath = Some(x))
} text("Path to preparator parameters JSON file.")
opt[String]("ap") action { (x, c) =>
c.copy(algorithmsParamsJsonPath = Some(x))
} text("Path to algorithms parameters JSON file.")
opt[String]("sp") action { (x, c) =>
c.copy(servingParamsJsonPath = Some(x))
} text("Path to serving parameters JSON file.")
opt[String]("mp") action { (x, c) =>
c.copy(metricsParamsJsonPath = Some(x))
} text("Path to metrics parameters")
opt[String]("jsonBasePath") action { (x, c) =>
c.copy(jsonBasePath = x)
} text("Base path to prepend to all parameters JSON files.")
opt[String]("env") action { (x, c) =>
c.copy(env = Some(x))
} text("Comma-separated list of environmental variables (in 'FOO=BAR' " +
"format) to pass to the Spark execution environment.")
}
parser.parse(args, WorkflowConfig()) map { wfc =>
val (engineLanguage, engine) = try {
WorkflowUtils.getEngine(wfc.engineFactory, getClass.getClassLoader)
} catch {
case e @ (_: ClassNotFoundException | _: NoSuchMethodException) =>
error(s"Unable to obtain engine: ${e.getMessage}. Aborting workflow.")
sys.exit(1)
}
val metrics = wfc.metricsClass.map { mc => //mc => null
try {
Class.forName(mc)
.asInstanceOf[Class[BaseMetrics[_ <: Params, _, _, _, _, _, _, _ <: AnyRef]]]
} catch {
case e: ClassNotFoundException =>
error("Unable to obtain metrics class object ${mc}: " +
s"${e.getMessage}. Aborting workflow.")
sys.exit(1)
}
}
val dataSourceParams = wfc.dataSourceParamsJsonPath.map(p =>
WorkflowUtils.extractParams(
engineLanguage,
stringFromFile(wfc.jsonBasePath, p),
engine.dataSourceClass)).getOrElse(EmptyParams())
val preparatorParams = wfc.preparatorParamsJsonPath.map(p =>
WorkflowUtils.extractParams(
engineLanguage,
stringFromFile(wfc.jsonBasePath, p),
engine.preparatorClass)).getOrElse(EmptyParams())
val algorithmsParams: Seq[(String, Params)] =
wfc.algorithmsParamsJsonPath.map { p =>
val algorithmsParamsJson = parse(stringFromFile(wfc.jsonBasePath, p))
algorithmsParamsJson match {
case JArray(s) => s.map { algorithmParamsJValue =>
val eap = algorithmParamsJValue.extract[AlgorithmParams]
(
eap.name,
WorkflowUtils.extractParams(
engineLanguage,
compact(render(eap.params)),
engine.algorithmClassMap(eap.name))
)
}
case _ => Nil
}
} getOrElse Seq(("", EmptyParams()))
val servingParams = wfc.servingParamsJsonPath.map(p =>
WorkflowUtils.extractParams(
engineLanguage,
stringFromFile(wfc.jsonBasePath, p),
engine.servingClass)).getOrElse(EmptyParams())
val metricsParams = wfc.metricsParamsJsonPath.map(p =>
if (metrics.isEmpty)
EmptyParams()
else
WorkflowUtils.extractParams(
engineLanguage,
stringFromFile(wfc.jsonBasePath, p),
metrics.get)
) getOrElse EmptyParams()
val engineParams = new EngineParams(
dataSourceParams = dataSourceParams,
preparatorParams = preparatorParams,
algorithmParamsList = algorithmsParams,
servingParams = servingParams)
val metricsInstance = metrics
.map(m => Doer(m, metricsParams))
.getOrElse(null)
val pioEnvVars = wfc.env.map(e =>
e.split(',').flatMap(p =>
p.split('=') match {
case Array(k, v) => List(k -> v)
case _ => Nil
}
).toMap
).getOrElse(Map())
val engineInstance = EngineInstance(
id = "",
status = "INIT",
startTime = DateTime.now,
endTime = DateTime.now,
engineId = wfc.engineId,
engineVersion = wfc.engineVersion,
engineFactory = wfc.engineFactory,
metricsClass = wfc.metricsClass.getOrElse(""),
batch = wfc.batch,
env = pioEnvVars,
dataSourceParams = write(dataSourceParams),
preparatorParams = write(preparatorParams),
algorithmsParams = write(algorithmsParams),
servingParams = write(servingParams),
metricsParams = write(metricsParams),
multipleMetricsResults = "",
multipleMetricsResultsHTML = "",
multipleMetricsResultsJSON = "")
val engineInstanceId = Storage.getMetaDataEngineInstances.insert(
engineInstance)
CoreWorkflow.runEngineTypeless(
env = pioEnvVars,
params = WorkflowParams(
verbose = 3,
batch = wfc.batch),
engine = engine,
engineParams = engineParams,
metrics = metricsInstance,
metricsParams = metricsParams,
engineInstance = Some(engineInstance.copy(id = engineInstanceId)))
}
}
}