blob: 4bc516852d95858f05ea1b8caf4d2d7d7c951047 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
package org.apache.geode.internal.process;
import static org.apache.commons.lang3.Validate.isTrue;
import static org.apache.commons.lang3.Validate.notNull;
import org.apache.commons.lang3.SystemUtils;
import org.apache.geode.internal.logging.LoggingThread;
* Reads the output stream of a Process.
* @since GemFire 7.0
public abstract class ProcessStreamReader implements Runnable {
private static final int DEFAULT_PROCESS_OUTPUT_WAIT_TIME_MILLIS = 5000;
protected final Process process;
protected final InputStream inputStream;
protected final InputListener inputListener;
private Thread thread;
protected ProcessStreamReader(final Builder builder) {
notNull(builder, "Invalid builder '" + builder + "' specified");
this.process = builder.process;
this.inputStream = builder.inputStream;
if (builder.inputListener == null) {
this.inputListener = new InputListener() {
public void notifyInputLine(String line) {
// do nothing
public String toString() {
return "NullInputListener";
} else {
this.inputListener = builder.inputListener;
public ProcessStreamReader start() {
synchronized (this) {
if (thread == null) {
thread = new LoggingThread(createThreadName(), this);
} else if (thread.isAlive()) {
throw new IllegalStateException(this + " has already started");
} else {
throw new IllegalStateException(this + " was stopped and cannot be restarted");
return this;
public ProcessStreamReader stop() {
synchronized (this) {
if (thread != null && thread.isAlive()) {
return this;
public ProcessStreamReader stopAsync(final long delayMillis) {
Runnable delayedStop = () -> {
try {
} catch (InterruptedException ignored) {
} finally {
String threadName =
getClass().getSimpleName() + " stopAfterDelay Thread @" + Integer.toHexString(hashCode());
Thread thread = new LoggingThread(threadName, delayedStop);
return this;
public boolean isRunning() {
synchronized (this) {
if (thread != null) {
return thread.isAlive();
return false;
public ProcessStreamReader join() throws InterruptedException {
Thread thread;
synchronized (this) {
thread = this.thread;
if (thread != null) {
return this;
public ProcessStreamReader join(final long millis) throws InterruptedException {
Thread thread;
synchronized (this) {
thread = this.thread;
if (thread != null) {
return this;
public ProcessStreamReader join(final long millis, final int nanos) throws InterruptedException {
Thread thread;
synchronized (this) {
thread = this.thread;
if (thread != null) {
thread.join(millis, nanos);
return this;
public String toString() {
StringBuilder sb = new StringBuilder(getClass().getSimpleName());
sb.append(" Thread").append(" #").append(System.identityHashCode(this));
sb.append(" alive=").append(isRunning());
sb.append(" listener=").append(inputListener);
return sb.toString();
private String createThreadName() {
return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode());
* Defines the callback for lines of output found in the stream.
public interface InputListener {
void notifyInputLine(final String line);
/** Default ReadingMode is BLOCKING */
public enum ReadingMode {
private static String waitAndCaptureProcessStandardOutputStream(final Process process,
final long waitTimeMilliseconds) {
notNull(process, "Invalid process '" + process + "' specified");
return waitAndCaptureProcessStream(process, process.getInputStream(), waitTimeMilliseconds);
public static String waitAndCaptureProcessStandardErrorStream(final Process process) {
return waitAndCaptureProcessStandardErrorStream(process,
public static String waitAndCaptureProcessStandardErrorStream(final Process process,
final long waitTimeMilliseconds) {
return waitAndCaptureProcessStream(process, process.getErrorStream(), waitTimeMilliseconds);
private static String waitAndCaptureProcessStream(final Process process,
final InputStream processInputStream, final long waitTimeMilliseconds) {
StringBuffer buffer = new StringBuffer();
InputListener inputListener = line -> {
ProcessStreamReader reader = new ProcessStreamReader.Builder(process)
try {
long endTime = System.currentTimeMillis() + waitTimeMilliseconds;
while (System.currentTimeMillis() < endTime) {
try {
} catch (InterruptedException ignore) {
} finally {
return buffer.toString();
* Builds a ProcessStreamReader.
* @since GemFire 8.2
public static class Builder {
final Process process;
InputStream inputStream;
InputListener inputListener;
long continueReadingMillis = 0;
ReadingMode readingMode = ReadingMode.BLOCKING;
public Builder(final Process process) {
this.process = process;
public Builder inputStream(final InputStream inputStream) {
this.inputStream = inputStream;
return this;
/** InputListener callback to invoke with read data */
public Builder inputListener(final InputListener inputListener) {
this.inputListener = inputListener;
return this;
/** millis to continue reading InputStream after Process terminates */
public Builder continueReadingMillis(final long continueReadingMillis) {
this.continueReadingMillis = continueReadingMillis;
return this;
/** ReadingMode to use for reading InputStream */
public Builder readingMode(final ReadingMode readingMode) {
this.readingMode = readingMode;
return this;
public ProcessStreamReader build() {
notNull(process, "Invalid process '" + process + "' specified");
notNull(inputStream, "Invalid inputStream '" + inputStream + "' specified");
isTrue(continueReadingMillis >= 0,
"Invalid continueReadingMillis '" + continueReadingMillis + "' specified");
switch (readingMode) {
return new NonBlockingProcessStreamReader(this);
return new BlockingProcessStreamReader(this);