| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.job |
| |
| |
| import org.apache.samza.SamzaException |
| import org.apache.samza.config._ |
| import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer} |
| import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig} |
| import org.apache.samza.metrics.MetricsRegistryMap |
| import org.apache.samza.runtime.ApplicationRunnerMain.ApplicationRunnerCommandLine |
| import org.apache.samza.runtime.ApplicationRunnerOperation |
| import org.apache.samza.system.{StreamSpec, SystemAdmins} |
| import org.apache.samza.util.ScalaJavaUtil.JavaOptionals |
| import org.apache.samza.util.{CoordinatorStreamUtil, Logging, StreamUtil, Util} |
| |
| import scala.collection.JavaConverters._ |
| |
| |
| object JobRunner extends Logging { |
| val SOURCE = "job-runner" |
| |
| def main(args: Array[String]) { |
| val cmdline = new ApplicationRunnerCommandLine |
| val options = cmdline.parser.parse(args: _*) |
| val config = cmdline.loadConfig(options) |
| val operation = cmdline.getOperation(options) |
| |
| val runner = new JobRunner(Util.rewriteConfig(config)) |
| doOperation(runner, operation) |
| } |
| |
| def doOperation(runner: JobRunner, operation: ApplicationRunnerOperation): Unit = { |
| operation match { |
| case ApplicationRunnerOperation.RUN => runner.run() |
| case ApplicationRunnerOperation.KILL => runner.kill() |
| case ApplicationRunnerOperation.STATUS => println(runner.status()) |
| case _ => |
| throw new SamzaException("Invalid job runner operation: %s" format operation) |
| } |
| } |
| } |
| |
| /** |
| * ConfigRunner is a helper class that sets up and executes a Samza job based |
| * on a config URI. The configFactory is instantiated, fed the configPath, |
| * and returns a Config, which is used to execute the job. |
| */ |
| class JobRunner(config: Config) extends Logging { |
| |
| /** |
| * This function submits the samza job. |
| * @param resetJobConfig This flag indicates whether or not to reset the job configurations when submitting the job. |
| * If this value is set to true, all previously written configs to coordinator stream will be |
| * deleted, and only the configs in the input config file will have an affect. Otherwise, any |
| * config that is not deleted will have an affect. |
| * By default this value is set to true. |
| * @return The job submitted |
| */ |
| def run(resetJobConfig: Boolean = true) = { |
| debug("config: %s" format (config)) |
| val jobFactory: StreamJobFactory = getJobFactory |
| val coordinatorSystemConsumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap) |
| val coordinatorSystemProducer = new CoordinatorStreamSystemProducer(config, new MetricsRegistryMap) |
| val systemAdmins = new SystemAdmins(config) |
| |
| // Create the coordinator stream if it doesn't exist |
| info("Creating coordinator stream") |
| val coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config) |
| val coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem) |
| coordinatorSystemAdmin.start() |
| CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin) |
| coordinatorSystemAdmin.stop() |
| |
| if (resetJobConfig) { |
| info("Storing config in coordinator stream.") |
| coordinatorSystemProducer.register(JobRunner.SOURCE) |
| coordinatorSystemProducer.start() |
| coordinatorSystemProducer.writeConfig(JobRunner.SOURCE, config) |
| } |
| info("Loading old config from coordinator stream.") |
| coordinatorSystemConsumer.register() |
| coordinatorSystemConsumer.start() |
| coordinatorSystemConsumer.bootstrap() |
| coordinatorSystemConsumer.stop() |
| |
| val oldConfig = coordinatorSystemConsumer.getConfig |
| if (resetJobConfig) { |
| val keysToRemove = oldConfig.keySet.asScala.toSet.diff(config.keySet.asScala) |
| info("Deleting old configs that are no longer defined: %s".format(keysToRemove)) |
| keysToRemove.foreach(key => { coordinatorSystemProducer.send(new Delete(JobRunner.SOURCE, key, SetConfig.TYPE)) }) |
| } |
| coordinatorSystemProducer.stop() |
| |
| |
| // if diagnostics is enabled, create diagnostics stream if it doesnt exist |
| if (new JobConfig(config).getDiagnosticsEnabled) { |
| val DIAGNOSTICS_STREAM_ID = "samza-diagnostics-stream-id" |
| val diagnosticsSystemStreamName = JavaOptionals.toRichOptional( |
| new MetricsConfig(config) |
| .getMetricsSnapshotReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)) |
| .toOption |
| .getOrElse(throw new ConfigException("Missing required config: " + |
| String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM, |
| MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS))) |
| |
| val diagnosticsSystemStream = StreamUtil.getSystemStreamFromNames(diagnosticsSystemStreamName) |
| val diagnosticsSysAdmin = systemAdmins.getSystemAdmin(diagnosticsSystemStream.getSystem) |
| val diagnosticsStreamSpec = new StreamSpec(DIAGNOSTICS_STREAM_ID, diagnosticsSystemStream.getStream, |
| diagnosticsSystemStream.getSystem, new StreamConfig(config).getStreamProperties(DIAGNOSTICS_STREAM_ID)) |
| |
| info("Creating diagnostics stream %s" format diagnosticsSystemStream.getStream) |
| diagnosticsSysAdmin.start() |
| if (diagnosticsSysAdmin.createStream(diagnosticsStreamSpec)) { |
| info("Created diagnostics stream %s" format diagnosticsSystemStream.getStream) |
| } else { |
| info("Diagnostics stream %s already exists" format diagnosticsSystemStream.getStream) |
| } |
| diagnosticsSysAdmin.stop() |
| } |
| |
| |
| // Create the actual job, and submit it. |
| val job = jobFactory.getJob(config) |
| |
| job.submit() |
| |
| info("Job submitted. Check status to determine when it is running.") |
| job |
| } |
| |
| def kill(): Unit = { |
| val jobFactory: StreamJobFactory = getJobFactory |
| |
| // Create the actual job, and kill it. |
| val job = jobFactory.getJob(config).kill() |
| |
| info("Kill command executed. Check status to determine when it is terminated.") |
| } |
| |
| def status(): ApplicationStatus = { |
| val jobFactory: StreamJobFactory = getJobFactory |
| |
| // Create the actual job, and get its status. |
| jobFactory.getJob(config).getStatus |
| } |
| |
| private def getJobFactory: StreamJobFactory = { |
| val jobConfig = new JobConfig(config) |
| val jobFactoryClass = JavaOptionals.toRichOptional(jobConfig.getStreamJobFactoryClass).toOption match { |
| case Some(factoryClass) => factoryClass |
| case _ => throw new SamzaException("no job factory class defined") |
| } |
| val jobFactory = Class.forName(jobFactoryClass).newInstance.asInstanceOf[StreamJobFactory] |
| info("job factory: %s" format (jobFactoryClass)) |
| jobFactory |
| } |
| } |