blob: 6a6abbccd1d4de5261a5cf0ac71e435aa653da55 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.process;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.internal.logging.LogService;
/**
* Reads the output stream of a Process.
*
* @author Kirk Lund
* @since 7.0
*/
public abstract class ProcessStreamReader implements Runnable {
private static final Logger logger = LogService.getLogger();
protected final Process process;
protected final InputStream inputStream;
protected final InputListener inputListener;
private Thread thread;
protected ProcessStreamReader(final Builder builder) {
this.process = builder.process;
this.inputStream = builder.inputStream;
if (builder.inputListener == null) {
this.inputListener = new InputListener() {
@Override
public void notifyInputLine(String line) {
// do nothing
}
@Override
public String toString() {
return "NullInputListener";
}
};
} else {
this.inputListener = builder.inputListener;
}
}
@Override
public void run() {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("Running {}", this);
}
BufferedReader reader = null;
try {
reader = new BufferedReader(new InputStreamReader(inputStream));
String line;
while ((line = reader.readLine()) != null) {
this.inputListener.notifyInputLine(line);
}
} catch (IOException e) {
if (isDebugEnabled) {
logger.debug("Failure reading from buffered input stream: {}", e.getMessage(), e);
}
} finally {
try {
reader.close();
} catch (IOException e) {
if (isDebugEnabled) {
logger.debug("Failure closing buffered input stream reader: {}", e.getMessage(), e);
}
}
if (isDebugEnabled) {
logger.debug("Terminating {}", this);
}
}
}
public ProcessStreamReader start() {
synchronized (this) {
if (this.thread == null) {
this.thread = new Thread(this, createThreadName());
this.thread.setDaemon(true);
this.thread.start();
} else if (this.thread.isAlive()){
throw new IllegalStateException(this + " has already started");
} else {
throw new IllegalStateException(this + " was stopped and cannot be restarted");
}
}
return this;
}
public ProcessStreamReader stop() {
synchronized (this) {
if (this.thread != null && this.thread.isAlive()) {
this.thread.interrupt();
} else if (this.thread != null){
if (logger.isDebugEnabled()) {
logger.debug("{} has already been stopped", this);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("{} has not been started", this);
}
}
}
return this;
}
public ProcessStreamReader stopAsync(final long delayMillis) {
Runnable delayedStop = new Runnable() {
@Override
public void run() {
try {
Thread.sleep(delayMillis);
} catch (InterruptedException e) {
} finally {
stop();
}
}
};
String threadName = getClass().getSimpleName() + " stopAfterDelay Thread @" + Integer.toHexString(hashCode());
Thread thread = new Thread(delayedStop, threadName);
thread.setDaemon(true);
thread.start();
return this;
}
public boolean isRunning() {
synchronized (this) {
if (this.thread != null) {
return this.thread.isAlive();
}
}
return false;
}
public void join() throws InterruptedException {
Thread thread;
synchronized (this) {
thread = this.thread;
}
if (thread != null) {
thread.join();
}
}
public void join(final long millis) throws InterruptedException {
Thread thread;
synchronized (this) {
thread = this.thread;
}
if (thread != null) {
thread.join(millis);
}
}
public void join(final long millis, final int nanos) throws InterruptedException {
Thread thread;
synchronized (this) {
thread = this.thread;
}
if (thread != null) {
thread.join(millis, nanos);
}
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(getClass().getSimpleName());
sb.append(" Thread").append(" #").append(System.identityHashCode(this));
sb.append(" alive=").append(isRunning()); //this.thread == null ? false : this.thread.isAlive());
sb.append(" listener=").append(this.inputListener);
return sb.toString();
}
private String createThreadName() {
return getClass().getSimpleName() + "@" + Integer.toHexString(hashCode());
}
/**
* Defines the callback for lines of output found in the stream.
*/
public static interface InputListener {
public void notifyInputLine(String line);
}
/** Default ReadingMode is BLOCKING */
public static enum ReadingMode {
BLOCKING,
NON_BLOCKING;
}
/**
* Builds a ProcessStreamReader.
*
* @author Kirk Lund
* @since 8.2
*/
public static class Builder {
protected Process process;
protected InputStream inputStream;
protected InputListener inputListener;
protected long continueReadingMillis = 0;
protected ReadingMode readingMode = ReadingMode.BLOCKING;
public Builder(final Process process) {
this.process = process;
}
public Builder inputStream(final InputStream inputStream) {
this.inputStream = inputStream;
return this;
}
/** InputListener callback to invoke with read data */
public Builder inputListener(final InputListener inputListener) {
this.inputListener = inputListener;
return this;
}
/** millis to continue reading InputStream after Process terminates */
public Builder continueReadingMillis(final long continueReadingMillis) {
this.continueReadingMillis = continueReadingMillis;
return this;
}
/** ReadingMode to use for reading InputStream */
public Builder readingMode(final ReadingMode readingMode) {
this.readingMode = readingMode;
return this;
}
public ProcessStreamReader build() {
if (process == null) {
throw new NullPointerException("process may not be null");
}
if (inputStream == null) {
throw new NullPointerException("inputStream may not be null");
}
if (continueReadingMillis < 0) {
throw new IllegalArgumentException("continueReadingMillis must zero or positive");
}
switch (this.readingMode) {
case NON_BLOCKING: return new NonBlockingProcessStreamReader(this);
default: return new BlockingProcessStreamReader(this);
}
}
}
}