blob: b76ec95bd8095f761db0e09a7cd0ff9249513b7c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.job
import joptsimple.{OptionSet, OptionSpec}
import org.apache.samza.SamzaException
import org.apache.samza.config._
import org.apache.samza.runtime.ApplicationRunnerOperation
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import org.apache.samza.util._
object JobRunner extends Logging {
val SOURCE = "job-runner"
class JobRunnerCommandLine extends CommandLine {
var operationOpt: OptionSpec[String] =
parser.accepts("operation", "The operation to perform; run, status, kill.")
.withRequiredArg
.ofType(classOf[String])
.describedAs("operation=run")
.defaultsTo("run")
def getOperation(options: OptionSet): ApplicationRunnerOperation = {
val rawOp = options.valueOf(operationOpt)
ApplicationRunnerOperation.fromString(rawOp)
}
}
def main(args: Array[String]) {
val cmdline = new JobRunnerCommandLine
val options = cmdline.parser.parse(args: _*)
val config = cmdline.loadConfig(options)
val operation = cmdline.getOperation(options)
val runner = new JobRunner(ConfigUtil.rewriteConfig(config))
doOperation(runner, operation)
}
def doOperation(runner: JobRunner, operation: ApplicationRunnerOperation): Unit = {
operation match {
case ApplicationRunnerOperation.RUN => runner.run()
case ApplicationRunnerOperation.KILL => runner.kill()
case ApplicationRunnerOperation.STATUS => println(runner.status())
case _ =>
throw new SamzaException("Invalid job runner operation: %s" format operation)
}
}
}
/**
* ConfigRunner is a helper class that sets up and executes a Samza job based
* on a config URI. The configFactory is instantiated, fed the configPath,
* and returns a Config, which is used to execute the job.
*/
class JobRunner(config: Config) extends Logging {
/**
* This function persist config in coordinator stream, create diagnostics stream if applicable and
* then submits the samza job.
* @param resetJobConfig This flag indicates whether or not to reset the job configurations in coordinator stream
* when submitting the job.
* If this value is set to true, all previously written configs to coordinator stream will be
* deleted, and only the configs in the input config file will have an affect. Otherwise, any
* config that is not deleted will have an affect.
* By default this value is set to true.
* @return The job submitted
*/
def run(resetJobConfig: Boolean = true): StreamJob = {
CoordinatorStreamUtil.writeConfigToCoordinatorStream(config, resetJobConfig)
DiagnosticsUtil.createDiagnosticsStream(config)
submit()
}
/**
* This function submits the samza job.
*
* @return The job submitted
*/
def submit(): StreamJob = {
// Create the actual job, and submit it.
val job = getJobFactory.getJob(config)
job.submit()
info("Job submitted. Check status to determine when it is running.")
job
}
def kill(): Unit = {
val jobFactory: StreamJobFactory = getJobFactory
// Create the actual job, and kill it.
val job = jobFactory.getJob(config).kill()
info("Kill command executed. Check status to determine when it is terminated.")
}
def status(): ApplicationStatus = {
val jobFactory: StreamJobFactory = getJobFactory
// Create the actual job, and get its status.
jobFactory.getJob(config).getStatus
}
private def getJobFactory: StreamJobFactory = {
val jobConfig = new JobConfig(config)
val jobFactoryClass = JavaOptionals.toRichOptional(jobConfig.getStreamJobFactoryClass).toOption match {
case Some(factoryClass) => factoryClass
case _ => throw new SamzaException("no job factory class defined")
}
val jobFactory = Class.forName(jobFactoryClass).newInstance.asInstanceOf[StreamJobFactory]
info("job factory: %s" format (jobFactoryClass))
jobFactory
}
}