blob: 83165166bc1c108b8eae9100af8758f8082fe89c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.exec;
import org.apache.hadoop.io.IOUtils;
import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.core.exceptions.SliderInternalStateException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Execute an application.
*
* Hadoop's Shell class isn't used because it assumes it is executing
* a short lived application:
*/
public class RunLongLivedApp implements Runnable {
public static final int STREAM_READER_SLEEP_TIME = 200;
public static final int RECENT_LINE_LOG_LIMIT = 64;
/**
* Class log
*/
static final Logger LOG = LoggerFactory.getLogger(RunLongLivedApp.class);
/**
* Log supplied in the constructor for the spawned process
*/
final Logger processLog;
private final ProcessBuilder builder;
private Process process;
private Exception exception;
private Integer exitCode = null;
volatile boolean done;
private Thread execThread;
private Thread logThread;
private ProcessStreamReader processStreamReader;
//list of recent lines, recorded for extraction into reports
private final List<String> recentLines = new LinkedList<>();
private final int recentLineLimit = RECENT_LINE_LOG_LIMIT;
private ApplicationEventHandler applicationEventHandler;
public RunLongLivedApp(Logger processLog, String... commands) {
this.processLog = processLog;
builder = new ProcessBuilder(commands);
initBuilder();
}
public RunLongLivedApp(Logger processLog, List<String> commands) {
this.processLog = processLog;
builder = new ProcessBuilder(commands);
initBuilder();
}
private void initBuilder() {
builder.redirectErrorStream(false);
}
public ProcessBuilder getBuilder() {
return builder;
}
/**
* Set an optional application exit callback
* @param applicationEventHandler callback to notify on application exit
*/
public void setApplicationEventHandler(ApplicationEventHandler applicationEventHandler) {
this.applicationEventHandler = applicationEventHandler;
}
/**
* Add an entry to the environment
* @param key key -must not be null
* @param val value
*/
public void putEnv(String key, String val) {
if (val == null) {
throw new RuntimeException("Null value for key " + key);
}
builder.environment().put(key, val);
}
/**
* Bulk set the environment from a map. This does
* not replace the existing environment, just extend it/overwrite single
* entries.
* @param map map to add
*/
public void putEnvMap(Map<String, String> map) {
for (Map.Entry<String, String> entry : map.entrySet()) {
String val = entry.getValue();
String key = entry.getKey();
putEnv(key, val);
}
}
/**
* Get the process environment
* @param key
* @return
*/
public String getEnv(String key) {
return builder.environment().get(key);
}
/**
* Get the process reference
* @return the process -null if the process is not started
*/
public Process getProcess() {
return process;
}
/**
* Get any exception raised by the process
* @return an exception or null
*/
public Exception getException() {
return exception;
}
public List<String> getCommands() {
return builder.command();
}
public String getCommand() {
return getCommands().get(0);
}
/**
* probe to see if the process is running
* @return true iff the process has been started and is not yet finished
*/
public boolean isRunning() {
return process != null && !done;
}
/**
* Get the exit code: null until the process has finished
* @return the exit code or null
*/
public Integer getExitCode() {
return exitCode;
}
/**
* Stop the process if it is running.
* This will trigger an application completion event with the given exit code
*/
public void stop() {
if (!isRunning()) {
return;
}
process.destroy();
}
/**
* Get a text description of the builder suitable for log output
* @return a multiline string
*/
protected String describeBuilder() {
StringBuilder buffer = new StringBuilder();
for (String arg : builder.command()) {
buffer.append('"').append(arg).append("\" ");
}
return buffer.toString();
}
private void dumpEnv(StringBuilder buffer) {
buffer.append("\nEnvironment\n-----------");
Map<String, String> env = builder.environment();
Set<String> keys = env.keySet();
List<String> sortedKeys = new ArrayList<String>(keys);
Collections.sort(sortedKeys);
for (String key : sortedKeys) {
buffer.append(key).append("=").append(env.get(key)).append('\n');
}
}
/**
* Exec the process
* @return the process
* @throws IOException
*/
private Process spawnChildProcess() throws IOException, SliderException {
if (process != null) {
throw new SliderInternalStateException("Process already started");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Spawning process:\n " + describeBuilder());
}
process = builder.start();
return process;
}
/**
* Entry point for waiting for the program to finish
*/
@Override // Runnable
public void run() {
LOG.debug("Application callback thread running");
//notify the callback that the process has started
if (applicationEventHandler != null) {
applicationEventHandler.onApplicationStarted(this);
}
try {
exitCode = process.waitFor();
} catch (InterruptedException e) {
LOG.debug("Process wait interrupted -exiting thread");
} finally {
//here the process has finished
LOG.info("process has finished");
//tell the logger it has to finish too
done = true;
//now call the callback if it is set
if (applicationEventHandler != null) {
applicationEventHandler.onApplicationExited(this, exitCode);
}
try {
logThread.join();
} catch (InterruptedException ignored) {
//ignored
}
}
}
/**
* Create a thread to wait for this command to complete.
* THE THREAD IS NOT STARTED.
* @return the thread
* @throws IOException Execution problems
*/
private Thread spawnIntoThread() throws IOException, SliderException {
spawnChildProcess();
return new Thread(this, getCommand());
}
/**
* Spawn the application
* @throws IOException IO problems
* @throws SliderException internal state of this class is wrong
*/
public void spawnApplication() throws IOException, SliderException {
execThread = spawnIntoThread();
execThread.start();
processStreamReader =
new ProcessStreamReader(processLog, STREAM_READER_SLEEP_TIME);
logThread = new Thread(processStreamReader, "IO");
logThread.start();
}
/**
* Get the lines of recent output
* @return the last few lines of output; an empty list if there are none
* or the process is not actually running
*/
public synchronized List<String> getRecentOutput() {
return new ArrayList<>(recentLines);
}
/**
* add the recent line to the list of recent lines; deleting
* an earlier on if the limit is reached.
*
* Implementation note: yes, a circular array would be more
* efficient, especially with some power of two as the modulo,
* but is it worth the complexity and risk of errors for
* something that is only called once per line of IO?
* @param line line to record
* @param isErrorStream is the line from the error stream
*/
private synchronized void recordRecentLine(String line,
boolean isErrorStream) {
if (line == null) {
return;
}
String entry = (isErrorStream ? "[ERR] " : "[OUT] ") + line;
recentLines.add(entry);
if (recentLines.size() > recentLineLimit) {
recentLines.remove(0);
}
}
/**
* Class to read data from the two process streams, and, when run in a thread
* to keep running until the <code>done</code> flag is set.
* Lines are fetched from stdout and stderr and logged at info and error
* respectively.
*/
private class ProcessStreamReader implements Runnable {
private final Logger streamLog;
private final int sleepTime;
private ProcessStreamReader(Logger streamLog, int sleepTime) {
this.streamLog = streamLog;
this.sleepTime = sleepTime;
}
private int readCharNonBlocking(BufferedReader reader) throws IOException {
if (reader.ready()) {
return reader.read();
} else {
return -1;
}
}
/**
* Read in a line, or, if the limit has been reached, the buffer
* so far
* @param reader source of data
* @param line line to build
* @param limit limit of line length
* @return true if the line can be printed
* @throws IOException IO trouble
*/
private boolean readAnyLine(BufferedReader reader,
StringBuilder line,
int limit)
throws IOException {
int next;
while ((-1 != (next = readCharNonBlocking(reader)))) {
if (next != '\n') {
line.append((char) next);
limit--;
if (line.length() > limit) {
//enough has been read in to print it any
return true;
}
} else {
//line end return flag to say so
return true;
}
}
//here the end of the stream is hit, or the limit
return false;
}
@Override //Runnable
@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
public void run() {
BufferedReader errReader = null;
BufferedReader outReader = null;
StringBuilder outLine = new StringBuilder(256);
StringBuilder errorLine = new StringBuilder(256);
try {
errReader = new BufferedReader(new InputStreamReader(process
.getErrorStream()));
outReader = new BufferedReader(new InputStreamReader(process
.getInputStream()));
while (!done) {
boolean processed = false;
if (readAnyLine(errReader, errorLine, 256)) {
String line = errorLine.toString();
recordRecentLine(line, true);
streamLog.warn(line);
errorLine.setLength(0);
processed = true;
}
if (readAnyLine(outReader, outLine, 256)) {
String line = outLine.toString();
recordRecentLine(line, false);
streamLog.info(line);
outLine.setLength(0);
processed |= true;
}
if (!processed) {
//nothing processed: wait a bit for data.
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
//ignore this, rely on the done flag
LOG.debug("Ignoring ", e);
}
}
}
//get here, done time
//print the current error line then stream through the rest
streamLog.error(errorLine.toString());
String line = errReader.readLine();
while (line != null) {
streamLog.error(line);
if (Thread.interrupted()) {
break;
}
line = errReader.readLine();
recordRecentLine(line, true);
}
//now do the info line
streamLog.info(outLine.toString());
line = outReader.readLine();
while (line != null) {
streamLog.info(line);
if (Thread.interrupted()) {
break;
}
line = outReader.readLine();
recordRecentLine(line, false);
}
} catch (Exception ignored) {
LOG.warn("encountered ", ignored);
//process connection has been torn down
} finally {
IOUtils.closeStream(errReader);
IOUtils.closeStream(outReader);
}
}
}
}