blob: e5864f9b449f84d57ed6881334b38033d81d06a1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.fnexecution.environment;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** A simple process manager which forks processes and kills them if necessary. */
@ThreadSafe
public class ProcessManager {
private static final Logger LOG = LoggerFactory.getLogger(ProcessManager.class);
/** For debugging purposes, we inherit I/O of processes. */
private static final boolean INHERIT_IO = LOG.isDebugEnabled();
/** A list of all managers to ensure all processes shutdown on JVM exit . */
private static final List<ProcessManager> ALL_PROCESS_MANAGERS = new ArrayList<>();
static {
// Install a shutdown hook to ensure processes are stopped/killed.
Runtime.getRuntime().addShutdownHook(ShutdownHook.create());
}
private final Map<String, Process> processes;
public static ProcessManager create() {
synchronized (ALL_PROCESS_MANAGERS) {
ProcessManager processManager = new ProcessManager();
ALL_PROCESS_MANAGERS.add(processManager);
return processManager;
}
}
private ProcessManager() {
this.processes = Collections.synchronizedMap(new HashMap<>());
}
static class RunningProcess {
private Process process;
RunningProcess(Process process) {
this.process = process;
}
/** Checks if the underlying process is still running. */
void isAliveOrThrow() throws IllegalStateException {
if (!process.isAlive()) {
throw new IllegalStateException("Process died with exit code " + process.exitValue());
}
}
@VisibleForTesting
Process getUnderlyingProcess() {
return process;
}
}
/**
* Forks a process with the given command and arguments.
*
* @param id A unique id for the process
* @param command the name of the executable to run
* @param args arguments to provide to the executable
* @return A RunningProcess which can be checked for liveness
*/
RunningProcess startProcess(String id, String command, List<String> args) throws IOException {
return startProcess(id, command, args, Collections.emptyMap());
}
/**
* Forks a process with the given command, arguments, and additional environment variables.
*
* @param id A unique id for the process
* @param command The name of the executable to run
* @param args Arguments to provide to the executable
* @param env Additional environment variables for the process to be forked
* @return A RunningProcess which can be checked for liveness
*/
public RunningProcess startProcess(
String id, String command, List<String> args, Map<String, String> env) throws IOException {
checkNotNull(id, "Process id must not be null");
checkNotNull(command, "Command must not be null");
checkNotNull(args, "Process args must not be null");
checkNotNull(env, "Environment map must not be null");
ProcessBuilder pb =
new ProcessBuilder(ImmutableList.<String>builder().add(command).addAll(args).build());
pb.environment().putAll(env);
if (INHERIT_IO) {
LOG.debug(
"==> DEBUG enabled: Inheriting stdout/stderr of process (adjustable in ProcessManager)");
pb.inheritIO();
} else {
pb.redirectErrorStream(true);
// Pipe stdout and stderr to /dev/null to avoid blocking the process due to filled PIPE buffer
if (System.getProperty("os.name", "").startsWith("Windows")) {
pb.redirectOutput(new File("nul"));
} else {
pb.redirectOutput(new File("/dev/null"));
}
}
LOG.debug("Attempting to start process with command: {}", pb.command());
Process newProcess = pb.start();
Process oldProcess = processes.put(id, newProcess);
if (oldProcess != null) {
stopProcess(id, oldProcess);
stopProcess(id, newProcess);
throw new IllegalStateException("There was already a process running with id " + id);
}
return new RunningProcess(newProcess);
}
/** Stops a previously started process identified by its unique id. */
public void stopProcess(String id) {
checkNotNull(id, "Process id must not be null");
Process process = checkNotNull(processes.remove(id), "Process for id does not exist: " + id);
stopProcess(id, process);
}
private void stopProcess(String id, Process process) {
if (process.isAlive()) {
LOG.debug("Attempting to stop process with id {}", id);
// first try to kill gracefully
process.destroy();
long maxTimeToWait = 2000;
if (waitForProcessToDie(process, maxTimeToWait)) {
LOG.debug("Process for worker {} shut down gracefully.", id);
} else {
LOG.info("Process for worker {} still running. Killing.", id);
process.destroyForcibly();
if (waitForProcessToDie(process, maxTimeToWait)) {
LOG.debug("Process for worker {} killed.", id);
} else {
LOG.warn("Process for worker {} could not be killed.", id);
}
}
}
}
/** Returns true if the process exists within maxWaitTimeMillis. */
private static boolean waitForProcessToDie(Process process, long maxWaitTimeMillis) {
final long startTime = System.currentTimeMillis();
while (process.isAlive() && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting on process", e);
}
}
return !process.isAlive();
}
private static class ShutdownHook extends Thread {
private static ShutdownHook create() {
return new ShutdownHook();
}
private ShutdownHook() {}
@Override
@SuppressFBWarnings("SWL_SLEEP_WITH_LOCK_HELD")
public void run() {
synchronized (ALL_PROCESS_MANAGERS) {
ALL_PROCESS_MANAGERS.forEach(ProcessManager::stopAllProcesses);
for (ProcessManager pm : ALL_PROCESS_MANAGERS) {
if (pm.processes.values().stream().anyMatch(Process::isAlive)) {
try {
// Graceful shutdown period
Thread.sleep(200);
break;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
ALL_PROCESS_MANAGERS.forEach(ProcessManager::killAllProcesses);
}
}
}
/** Stop all remaining processes gracefully, i.e. upon JVM shutdown */
private void stopAllProcesses() {
processes.forEach((id, process) -> process.destroy());
}
/** Kill all remaining processes forcibly, i.e. upon JVM shutdown */
private void killAllProcesses() {
processes.forEach((id, process) -> process.destroyForcibly());
}
}