blob: 3720d23ca0f8d1c810c41c7b5d3fcd3da47e9f19 [file] [log] [blame]
/*
*
* Derby - Class org.apache.derbyTesting.junit.SpawnedProcess
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific
* language governing permissions and limitations under the License.
*/
package org.apache.derbyTesting.junit;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.lang.reflect.Field;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Timer;
import java.util.TimerTask;
import static junit.framework.Assert.assertTrue;
import static org.apache.derbyTesting.junit.BaseTestCase.execJavaCmd;
import static org.apache.derbyTesting.junit.BaseTestCase.getJavaExecutableName;
import static org.apache.derbyTesting.junit.BaseTestCase.isIBMJVM;
import static org.apache.derbyTesting.junit.BaseTestCase.isWindowsPlatform;
/**
* Utility code that wraps a spawned process (Java Process object).
* <p>
* There are three main aspects handled by this class:
* <ul> <li>Draining the output streams of the process.<br/>
* Happens automatically, the output gathered can be accessed with
* {@linkplain #getFailMessage}, {@linkplain #getFullServerError},
* {@linkplain #getFullServerOutput}, and
* {@linkplain #getNextServerOutput}</li>
* <li>Waiting for process completion, followed by cleanup (see
* {@linkplain #complete()} and {@linkplain #complete(long)})</li>
* <li>Forcibly destroying a process that live too long, for instance
* if inter-process communication hangs. This happens automatically
* if a threshold value is exceeded.</li>
* </ul>
* <p>
* <em>Implementation notes</em>: Active waiting is employed when waiting for
* the process to complete. This is considered acceptable since the expected
* usage pattern is to spawn the process, execute a set of tests, and then
* finally asking the process to shut down. Waiting for the process to
* complete is the last step, and a process typically lives only for a short
* period of time anyway (often only for seconds, seldom more than a few
* minutes).
* <br/>
* Forcibly destroying processes that live too long makes the test run
* continue even when facing inter-process communication hangs. The prime
* example is when both the client and the server are waiting for the other
* party to send data. Since the timeout is very high this feature is intended
* to avoid automated test runs from hanging indefinitely, for instance due to
* environmental issues affecting the process.
*/
//@NotThreadSafe
public final class SpawnedProcess {
private static final String TAG = "DEBUG: {SpawnedProcess} ";
private static Timer KILL_TIMER;
/**
* Property allowing the kill threshold to be overridden.
* <p>
* Interprets the numeric value as milliseconds, ignored if non-numeric.
* Overriding this value may be required if the test machine is extremely
* slow, or you want to kill hung processes earlier for some reason.
*/
private static final String KILL_THRESHOLD_PROPERTY =
"derby.tests.process.killThreshold";
private static final long KILL_THRESHOLD_DEFAULT = 45*60*1000; // 45 minutes
/** The maximum allowed time for a process to live. */
private static final long KILL_THRESHOLD;
static {
long tmpThreshold = KILL_THRESHOLD_DEFAULT;
String tmp = BaseTestCase.getSystemProperty(KILL_THRESHOLD_PROPERTY);
if (tmp != null) {
try {
tmpThreshold = Long.parseLong(tmp);
} catch (NumberFormatException nfe) {
// Ignore, use the default set previously.
System.err.println(TAG + "Invalid kill threshold: " + tmp);
}
}
KILL_THRESHOLD = tmpThreshold;
}
private static void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException ie) {
// Ignore the interrupt. We want to make sure the process
// terminates before returning, and we don't want to preserve
// the interrupt flag because it causes Derby to shut down. These
// are test requirements and don't apply for production code.
// Print a notice to stdout.
System.out.println(TAG + "Interrupted while sleeping (ignored)");
}
}
private final String name;
private final Process javaProcess;
private final StreamSaver errSaver;
private final StreamSaver outSaver;
private boolean suppressOutput;
private final TimerTask killTask;
/**
* Creates a new wrapper to handle the given process.
*
* @param javaProcess a (running) process
* @param name name to associate with the process
*/
public SpawnedProcess(Process javaProcess, String name) {
this.javaProcess = javaProcess;
this.name = name;
errSaver = startStreamSaver(javaProcess.getErrorStream(), name
.concat(":System.err"));
outSaver = startStreamSaver(javaProcess.getInputStream(), name
.concat(":System.out"));
killTask = scheduleKill(javaProcess, name);
}
/**
* Schedules a task to kill/terminate the task after a predefined timeout.
*
* @param name name of the process
* @param process the process
* @return The task object.
*/
private TimerTask scheduleKill(Process process, String name) {
synchronized (KILL_THRESHOLD_PROPERTY) {
if (KILL_TIMER == null) {
// Can't use 1.5 methods yet due to J2ME. Add name later.
KILL_TIMER = new Timer(true);
}
}
TimerTask killer = new ProcessKillerTask(process, name);
KILL_TIMER.schedule(killer, KILL_THRESHOLD);
return killer;
}
/**
* Causes output obtained from the process to be suppressed when
* executing the {@code complete}-methods.
*
* @see #getFullServerOutput() to obtain suppressed output from stdout
* @see #getFullServerError() to obtain suppressed output from stderr
*/
public void suppressOutputOnComplete() {
suppressOutput = true;
}
/**
* Get the Java Process object
*/
public Process getProcess() {
return javaProcess;
}
/**
* <p>
* Get the full server output (stdout) as a string using the default
* encoding which is assumed is how it was originally written.
* </p>
*
* <p>
* This method should only be called after the process has completed.
* That is, {@link #complete()} or {@link #complete(long)}
* should be called first.
* </p>
*/
public String getFullServerOutput() throws InterruptedException {
// First wait until we've read all the output.
outSaver.thread.join();
synchronized (this) {
return outSaver.stream.toString();
}
}
/**
* Get the full server error output (stderr) as a string using the default
* encoding which is assumed is how it was originally written.
* <p>
* This method should only be called after the process has completed.
* That is, {@link #complete()} or {@link #complete(long)}
* should be called first.
*/
public String getFullServerError() throws InterruptedException {
// First wait until we've read all the output on stderr.
errSaver.thread.join();
synchronized (this) {
return errSaver.stream.toString();
}
}
/**
* Position offset for getNextServerOutput().
*/
int stdOutReadOffset;
/**
* Get the next set of server output (stdout) as a string using the default
* encoding which is assumed is how it was originally
* written. Assumes a single caller is executing the calls
* to this method.
*/
public String getNextServerOutput() {
byte[] fullData;
synchronized (this) {
fullData = outSaver.stream.toByteArray();
}
String output = new String(fullData, stdOutReadOffset,
fullData.length - stdOutReadOffset);
stdOutReadOffset = fullData.length;
return output;
}
/**
* Get a fail message that is the passed in reason plus
* the stderr and stdout for any output written. Allows
* easier debugging if the reason the process failed is there!
*/
public String getFailMessage(String reason) {
sleep(500);
StringBuffer sb = new StringBuffer();
sb.append(reason);
sb.append(":Spawned ");
sb.append(name);
sb.append(" exitCode=");
try {
sb.append(javaProcess.exitValue());
} catch (IllegalThreadStateException e) {
sb.append("running");
}
ByteArrayOutputStream err = errSaver.stream;
ByteArrayOutputStream out = outSaver.stream;
synchronized (this) {
if (err.size() != 0)
{
sb.append("\nSTDERR:\n");
sb.append(err.toString());
}
if (out.size() != 0)
{
sb.append("\nSTDOUT:\n");
sb.append(out.toString());
}
}
return sb.toString();
}
/**
* Waits for the process to terminate.
* <p>
* This call will block until one of the following conditions are met:
* <ul> <li>the process terminates on its own</li>
* <li>the hung-process watchdog mechanism forcibly terminates the
* process (see {@linkplain #scheduleKill})</li>
* @return The process exit code.
* @throws IOException if printing diagnostics fails
*/
public int complete()
throws IOException {
return complete(Long.MAX_VALUE);
}
/**
* Waits for the process to terminate, forcibly terminating it if it
* takes longer than the specified timeout.
* <p>
* This call will block until one of the following conditions are met:
* <ul> <li>the process terminates on its own</li>
* <li>the timeout is exceeded, at which point the process is
* forcibly destroyed</li>
* <li>the hung-process watchdog mechanism forcibly terminates the
* process (see {@linkplain #scheduleKill})</li>
* @param timeout the number of milliseconds to wait for the process
* to terminate normally before destroying it
* @return The process exit code.
* @throws IOException if printing diagnostics fails
*/
public int complete(long timeout)
throws IOException {
long start = System.currentTimeMillis();
Integer exitCode = null;
while (exitCode == null) {
try {
exitCode = javaProcess.exitValue();
} catch (IllegalThreadStateException itse) {
// This exception means the process is running.
if (System.currentTimeMillis() - start > timeout) {
javaProcess.destroy();
}
sleep(500);
}
}
// Clean up
killTask.cancel();
joinWith(errSaver.thread);
joinWith(outSaver.thread);
cleanupProcess();
printDiagnostics(exitCode.intValue());
return exitCode.intValue();
}
/**
* Cleans up the process, explicitly closing the streams associated with it.
*/
private void cleanupProcess() {
// Doing this is considered best practice.
closeStream(javaProcess.getOutputStream());
closeStream(javaProcess.getErrorStream());
closeStream(javaProcess.getInputStream());
javaProcess.destroy();
}
/**
* Prints diagnostics to stdout/stderr if the process failed.
*
* @param exitCode the exit code of the spawned process
* @throws IOException if writing to an output stream fails
* @see #suppressOutput
*/
private synchronized void printDiagnostics(int exitCode)
throws IOException {
// Always write the error, except when suppressed.
ByteArrayOutputStream err = errSaver.stream;
if (!suppressOutput && err.size() != 0) {
System.err.println("START-SPAWNED:" + name + " ERROR OUTPUT:");
err.writeTo(System.err);
System.err.println("END-SPAWNED :" + name + " ERROR OUTPUT:");
}
// Only write contents of stdout if it appears the server
// failed in some way, or output is suppressed.
ByteArrayOutputStream out = outSaver.stream;
if (!suppressOutput && exitCode != 0 && out.size() != 0) {
System.out.println("START-SPAWNED:" + name
+ " STANDARD OUTPUT: exit code=" + exitCode);
out.writeTo(System.out);
System.out.println("END-SPAWNED :" + name
+ " STANDARD OUTPUT:");
}
}
/** Joins up with the specified thread. */
private void joinWith(Thread t) {
try {
t.join();
} catch (InterruptedException ie) {
// Ignore the interrupt. We want to make sure the process
// terminates before returning, and we don't want to preserve
// the interrupt flag because it causes Derby to shut down. These
// are test requirements and don't apply for production code.
// Print a notice to stdout.
System.out.println(TAG + "Interrupted while joining " +
"with thread '" + t.toString() + "'");
}
}
/**
* Closes the specified stream, ignoring any exceptions.
*
* @param stream stream to close (may be {@code null})
*/
private void closeStream(Object stream) {
if (stream instanceof InputStream) {
try {
((InputStream)stream).close();
} catch (IOException ioe) {
// Ignore exception on close
}
} else if (stream instanceof OutputStream) {
try {
((OutputStream)stream).close();
} catch (IOException ioe) {
// Ignore exception on close
}
}
}
/**
* Class holding references to a stream that receives the output from a
* process and a thread that reads the process output and passes it on
* to the stream.
*/
private static class StreamSaver {
final ByteArrayOutputStream stream;
final Thread thread;
StreamSaver(ByteArrayOutputStream stream, Thread thread) {
this.stream = stream;
this.thread = thread;
}
}
/**
* Creates and starts a stream saver that reads the specified input stream
* in a separate stream.
*
* @param in input stream to read from
* @param name name of the thread
* @return A {@code StreamSaver} object.
*/
private StreamSaver startStreamSaver(final InputStream in,
final String name) {
final ByteArrayOutputStream out = new ByteArrayOutputStream() {
public void reset() {
super.reset();
new Throwable("WWW").printStackTrace(System.out);
}
};
Thread streamReader = new Thread(new Runnable() {
public void run() {
try {
byte[] buffer = new byte[1024];
int read;
while ((read = in.read(buffer)) != -1) {
synchronized (SpawnedProcess.this) {
out.write(buffer, 0, read);
}
}
} catch (IOException ioe) {
ioe.printStackTrace(new PrintStream(out, true));
}
}
}, name);
streamReader.setDaemon(true);
streamReader.start();
return new StreamSaver(out, streamReader);
}
/**
* A task that will kill the specified process.
*
* @see #scheduleKill(java.lang.Process, java.lang.String)
*/
private static class ProcessKillerTask
extends TimerTask {
private final String name;
private Process process;
public ProcessKillerTask(Process process, String name) {
this.process = process;
this.name = name;
}
public synchronized boolean cancel() {
// Since this task will usually be in the timer queue for a long
// time, nullify the process reference on cancel to free resources.
process = null;
return super.cancel();
}
public synchronized void run() {
// We may have just been cancelled
if (process == null) {
return;
}
System.err.println("DEBUG: Destroying process '" + name + "'");
process.destroy();
int retriesAllowed = 10;
while (retriesAllowed > 0) {
try {
int exitCode = process.exitValue();
System.err.println("DEBUG: Destroyed process '" + name +
"', exit code is " + exitCode);
break;
} catch (IllegalThreadStateException itse) {
// Sleep for a second and retry.
sleep(1000);
retriesAllowed--;
}
}
if (retriesAllowed == 0) {
System.err.println(
"DEBUG: Failed to destroy process '" + name + "'");
}
process = null;
}
}
/**
* Return {@code true} if the subprocess {@code p} has exited within {@code
* patience} milliseconds. Sleep {@code sleepInterval} between each check}.
* Note: you still need to call one of the {@link #complete} overloads even
* if using this method (which is optional). It can be used before trying
* a {@link #jstack} call.
*
* @param patience the maximum milliseconds we want to wait for
* @param sleepInterval sleep for this amount of milliseconds before trying
* testing again if not already exited the first time
* we check. If patience &lt;= sleepInterval we only
* check once.
* @return true if the process exited before our patience is up.
* @throws java.lang.InterruptedException
*/
@SuppressWarnings("SleepWhileInLoop")
public boolean waitForExit(long patience, long sleepInterval)
throws InterruptedException {
boolean completed = false;
while (!completed && patience > 0) {
try {
try {
javaProcess.exitValue();
completed = true;
} catch (IllegalThreadStateException e) {
// try again after sleeping
Thread.sleep(sleepInterval);
patience = patience - sleepInterval;
}
} catch (InterruptedException e) {
throw e;
}
}
return completed;
}
/**
* Return the jstack(1) dump of the process if possible.
* It will only work if we are running with a full JDK, not a simple JRE.
* It will not work on Windows, and just return an empty string.
* @return jstack dump if possible
* @throws PrivilegedActionException
* @throws InterruptedException
*/
public String jstack()
throws PrivilegedActionException, InterruptedException{
String output = "";
if (!isWindowsPlatform() && !isIBMJVM()) {
// Get the pid of the subprocess using reflection. Dirty,
// for Unix there is a private field pid in the implementing
// class.
final int pid = getPid();
final String execName = getJavaExecutableName().replace(
"jre" + File.separator + "bin" + File.separator + "java",
"bin" + File.separator + "jstack");
final String[] arguments =
new String[]{Integer.toString(pid)};
try {
final Process p2 =
execJavaCmd(execName, null, arguments, null, false);
final SpawnedProcess spawn2 = new SpawnedProcess(p2, "jstack");
spawn2.suppressOutputOnComplete();
// Close stdin of the process so that it stops
// any waiting for it and exits (shouldn't matter for this test)
p2.getOutputStream().close();
final int exitCode2 = spawn2.complete(30000); // 30 seconds
assertTrue(spawn2.getFailMessage("jstack failed: "),
exitCode2 == 0);
output = spawn2.getFullServerOutput();
} catch (IOException e) {
output = "Tried to catch jstack of hanging subprocess but it "
+ "failed (using JDK or JRE?): " + e;
}
}
return output;
}
/**
* Return the pid if on Unixen, or -1 on Windows (can't be obtained).
* @return pid
* @throws PrivilegedActionException
*/
public int getPid() throws PrivilegedActionException {
if (!isWindowsPlatform() && !isIBMJVM()) {
return AccessController.doPrivileged(
new PrivilegedExceptionAction<Integer>() {
@Override
public Integer run() throws IllegalAccessException,
NoSuchFieldException {
final Field f = javaProcess.getClass().
getDeclaredField("pid");
f.setAccessible(true);
return f.getInt(javaProcess);
}
});
} else {
return -1;
}
}
}