blob: 9e0285254e5834efd0daa29179640ebf178375ab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.job.local
import java.util.concurrent.CountDownLatch
import org.apache.samza.coordinator.JobModelManager
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore
import org.apache.samza.job.ApplicationStatus.{New, Running, SuccessfulFinish, UnsuccessfulFinish}
import org.apache.samza.job.{ApplicationStatus, CommandBuilder, StreamJob}
import org.apache.samza.util.Logging
import scala.collection.JavaConverters._
object ProcessJob {
private def createProcessBuilder(commandBuilder: CommandBuilder): ProcessBuilder = {
val processBuilder = new ProcessBuilder(commandBuilder.buildCommand.split(" ").toList.asJava)
processBuilder.environment.putAll(commandBuilder.buildEnvironment)
// Pipe all output to this process's streams.
processBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT)
processBuilder.redirectError(ProcessBuilder.Redirect.INHERIT)
processBuilder
}
}
class ProcessJob(
commandBuilder: CommandBuilder,
val jobModelManager: JobModelManager,
val coordinatorStreamStore: CoordinatorStreamStore) extends StreamJob with Logging {
import ProcessJob._
val lock = new Object
val processBuilder: ProcessBuilder = createProcessBuilder(commandBuilder)
var jobStatus: ApplicationStatus = New
var processThread: Option[Thread] = None
def submit: StreamJob = {
val threadStartCountDownLatch = new CountDownLatch(1)
// Create a non-daemon thread to make job runner block until the job finishes.
// Without this, the proc dies when job runner ends.
processThread = Some(new Thread {
override def run {
var processExitCode = -1
var process: Option[Process] = None
setStatus(Running)
try {
threadStartCountDownLatch.countDown
process = Some(processBuilder.start)
processExitCode = process.get.waitFor
} catch {
case _: InterruptedException => process foreach { p => p.destroyForcibly }
case e: Exception => error("Encountered an error during job start: %s".format(e.getMessage))
} finally {
jobModelManager.stop
coordinatorStreamStore.close
setStatus(if (processExitCode == 0) SuccessfulFinish else UnsuccessfulFinish)
}
}
})
info("Starting process job")
processThread.get.start
threadStartCountDownLatch.await
ProcessJob.this
}
def kill: StreamJob = {
getStatus match {
case Running => {
info("Attempting to kill running process job")
processThread foreach { thread =>
thread.interrupt
thread.join
info("Process job killed successfully")
}
}
case status => warn("Ignoring attempt to kill a process job that is not running. Job status is %s".format(status))
}
ProcessJob.this
}
def waitForFinish(timeoutMs: Long): ApplicationStatus = {
require(timeoutMs >= 0, "Timeout values must be non-negative")
processThread foreach { thread => thread.join(timeoutMs) }
getStatus
}
def waitForStatus(status: ApplicationStatus, timeoutMs: Long): ApplicationStatus = lock.synchronized {
require(timeoutMs >= 0, "Timeout values must be non-negative")
timeoutMs match {
case 0 => {
info("Waiting for application status %s indefinitely".format(status))
while (getStatus != status) lock.wait(0)
}
case _ => {
info("Waiting for application status %s for %d ms".format(status, timeoutMs))
val startTimeMs = System.currentTimeMillis
var remainingTimeoutMs = timeoutMs
while (getStatus != status && remainingTimeoutMs > 0) {
lock.wait(remainingTimeoutMs)
val elapsedWaitTimeMs = System.currentTimeMillis - startTimeMs
remainingTimeoutMs = timeoutMs - elapsedWaitTimeMs
}
}
}
getStatus
}
def getStatus: ApplicationStatus = lock.synchronized {
jobStatus
}
private def setStatus(status: ApplicationStatus): Unit = lock.synchronized {
info("Changing process job status from %s to %s".format(jobStatus, status))
jobStatus = status
lock.notify
}
}