| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * https://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.commons.exec; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.io.PipedOutputStream; |
| import java.time.Duration; |
| import java.time.Instant; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ThreadFactory; |
| |
| import org.apache.commons.exec.util.DebugUtils; |
| |
| /** |
| * Copies standard output and error of sub-processes to standard output and error of the parent process. If output or error stream are set to null, any feedback |
| * from that stream will be lost. |
| */ |
| public class PumpStreamHandler implements ExecuteStreamHandler { |
| |
| /** Three seconds timeout. */ |
| private static final Duration STOP_TIMEOUT_ADDITION = Duration.ofSeconds(2); |
| |
| /** |
| * Output thread. |
| */ |
| private Thread outputThread; |
| |
| /** |
| * Error thread. |
| */ |
| private Thread errorThread; |
| |
| /** Input thread. */ |
| private Thread inputThread; |
| |
| /** Output stream. */ |
| private final OutputStream outputStream; |
| |
| /** Error output stream. */ |
| private final OutputStream errorOutputStream; |
| |
| /** Error input stream. */ |
| private final InputStream inputStream; |
| |
| /** Pumper input stream. */ |
| private InputStreamPumper inputStreamPumper; |
| |
| /** The timeout Duration the implementation waits when stopping the pumper threads. */ |
| private Duration stopTimeout = Duration.ZERO; |
| |
| /** The last exception being caught. */ |
| private IOException caught; |
| |
| /** |
| * The thread factory. |
| */ |
| private final ThreadFactory threadFactory; |
| |
| /** |
| * Constructs a new {@link PumpStreamHandler}. |
| */ |
| public PumpStreamHandler() { |
| this(System.out, System.err); |
| } |
| |
| /** |
| * Constructs a new {@link PumpStreamHandler}. |
| * |
| * @param allOutputStream the output/error {@link OutputStream}. The {@code OutputStream} |
| * implementation must be thread-safe because the output and error reader threads will |
| * concurrently write to it. |
| */ |
| public PumpStreamHandler(final OutputStream allOutputStream) { |
| this(allOutputStream, allOutputStream); |
| } |
| |
| /** |
| * Constructs a new {@link PumpStreamHandler}. |
| * |
| * <p>If the same {@link OutputStream} instance is used for output and error, then it must be |
| * thread-safe because the output and error reader threads will concurrently write to it. |
| * |
| * @param outputStream the output {@link OutputStream}. |
| * @param errorOutputStream the error {@link OutputStream}. |
| */ |
| public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream) { |
| this(outputStream, errorOutputStream, null); |
| } |
| |
| /** |
| * Constructs a new {@link PumpStreamHandler}. |
| * |
| * <p>If the same {@link OutputStream} instance is used for output and error, then it must be |
| * thread-safe because the output and error reader threads will concurrently write to it. |
| * |
| * @param outputStream the output {@link OutputStream}. |
| * @param errorOutputStream the error {@link OutputStream}. |
| * @param inputStream the input {@link InputStream}. |
| */ |
| public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream, final InputStream inputStream) { |
| this(Executors.defaultThreadFactory(), outputStream, errorOutputStream, inputStream); |
| } |
| |
| /** |
| * Constructs a new {@link PumpStreamHandler}. |
| * |
| * <p>If the same {@link OutputStream} instance is used for output and error, then it must be |
| * thread-safe because the output and error reader threads will concurrently write to it. |
| * |
| * @param outputStream the output {@link OutputStream}. |
| * @param errorOutputStream the error {@link OutputStream}. |
| * @param inputStream the input {@link InputStream}. |
| */ |
| private PumpStreamHandler(final ThreadFactory threadFactory, final OutputStream outputStream, final OutputStream errorOutputStream, |
| final InputStream inputStream) { |
| this.threadFactory = threadFactory; |
| this.outputStream = outputStream; |
| this.errorOutputStream = errorOutputStream; |
| this.inputStream = inputStream; |
| } |
| |
| /** |
| * Create the pump to handle error output. |
| * |
| * @param is the {@link InputStream}. |
| * @param os the {@link OutputStream}. |
| */ |
| protected void createProcessErrorPump(final InputStream is, final OutputStream os) { |
| errorThread = createPump(is, os); |
| } |
| |
| /** |
| * Create the pump to handle process output. |
| * |
| * @param is the {@link InputStream}. |
| * @param os the {@link OutputStream}. |
| */ |
| protected void createProcessOutputPump(final InputStream is, final OutputStream os) { |
| outputThread = createPump(is, os); |
| } |
| |
| /** |
| * Creates a stream pumper to copy the given input stream to the given output stream. When the 'os' is an PipedOutputStream we are closing 'os' afterward |
| * to avoid an IOException ("Write end dead"). |
| * |
| * @param is the input stream to copy from. |
| * @param os the output stream to copy into. |
| * @return the stream pumper thread. |
| */ |
| protected Thread createPump(final InputStream is, final OutputStream os) { |
| return createPump(is, os, os instanceof PipedOutputStream); |
| } |
| |
| /** |
| * Creates a stream pumper to copy the given input stream to the given output stream. |
| * |
| * @param is the input stream to copy from. |
| * @param os the output stream to copy into. |
| * @param closeWhenExhausted close the output stream when the input stream is exhausted. |
| * @return the stream pumper thread. |
| */ |
| protected Thread createPump(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) { |
| return ThreadUtil.newThread(threadFactory, new StreamPumper(is, os, closeWhenExhausted), "CommonsExecStreamPumper-", true); |
| } |
| |
| /** |
| * Creates a stream pumper to copy the given input stream to the given output stream. |
| * |
| * @param is the System.in input stream to copy from. |
| * @param os the output stream to copy into. |
| * @return the stream pumper thread. |
| */ |
| private Thread createSystemInPump(final InputStream is, final OutputStream os) { |
| inputStreamPumper = new InputStreamPumper(is, os); |
| return ThreadUtil.newThread(threadFactory, inputStreamPumper, "CommonsExecStreamPumper-", true); |
| } |
| |
| /** |
| * Gets the error stream. |
| * |
| * @return {@link OutputStream}. |
| */ |
| protected OutputStream getErr() { |
| return errorOutputStream; |
| } |
| |
| /** |
| * Gets the output stream. |
| * |
| * @return {@link OutputStream}. |
| */ |
| protected OutputStream getOut() { |
| return outputStream; |
| } |
| |
| Duration getStopTimeout() { |
| return stopTimeout; |
| } |
| |
| /** |
| * Sets the {@link InputStream} from which to read the standard error of the process. |
| * |
| * @param is the {@link InputStream}. |
| */ |
| @Override |
| public void setProcessErrorStream(final InputStream is) { |
| if (errorOutputStream != null) { |
| createProcessErrorPump(is, errorOutputStream); |
| } |
| } |
| |
| /** |
| * Sets the {@link OutputStream} by means of which input can be sent to the process. |
| * |
| * @param os the {@link OutputStream}. |
| */ |
| @Override |
| public void setProcessInputStream(final OutputStream os) { |
| if (inputStream != null) { |
| if (inputStream == System.in) { |
| inputThread = createSystemInPump(inputStream, os); |
| } else { |
| inputThread = createPump(inputStream, os, true); |
| } |
| } else { |
| try { |
| os.close(); |
| } catch (final IOException e) { |
| final String msg = "Got exception while closing output stream"; |
| DebugUtils.handleException(msg, e); |
| } |
| } |
| } |
| |
| /** |
| * Sets the {@link InputStream} from which to read the standard output of the process. |
| * |
| * @param is the {@link InputStream}. |
| */ |
| @Override |
| public void setProcessOutputStream(final InputStream is) { |
| if (outputStream != null) { |
| createProcessOutputPump(is, outputStream); |
| } |
| } |
| |
| /** |
| * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called. |
| * |
| * @param timeout timeout or zero to wait forever (default). |
| * @since 1.4.0 |
| */ |
| public void setStopTimeout(final Duration timeout) { |
| this.stopTimeout = timeout != null ? timeout : Duration.ZERO; |
| } |
| |
| /** |
| * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called. |
| * |
| * @param timeout timeout in milliseconds or zero to wait forever (default). |
| * @deprecated Use {@link #setStopTimeout(Duration)}. |
| */ |
| @Deprecated |
| public void setStopTimeout(final long timeout) { |
| this.stopTimeout = Duration.ofMillis(timeout); |
| } |
| |
| /** |
| * Starts the {@link Thread}s. |
| */ |
| @Override |
| public void start() { |
| start(outputThread); |
| start(errorThread); |
| start(inputThread); |
| } |
| |
| /** |
| * Starts the given {@link Thread}. |
| */ |
| private void start(final Thread thread) { |
| if (thread != null) { |
| thread.start(); |
| } |
| } |
| |
| /** |
| * Stops pumping the streams. When a timeout is specified it is not guaranteed that the pumper threads are cleanly terminated. |
| */ |
| @Override |
| public void stop() throws IOException { |
| if (inputStreamPumper != null) { |
| inputStreamPumper.stopProcessing(); |
| } |
| stop(outputThread, stopTimeout); |
| stop(errorThread, stopTimeout); |
| stop(inputThread, stopTimeout); |
| |
| if (errorOutputStream != null && errorOutputStream != outputStream) { |
| try { |
| errorOutputStream.flush(); |
| } catch (final IOException e) { |
| final String msg = "Got exception while flushing the error stream : " + e.getMessage(); |
| DebugUtils.handleException(msg, e); |
| } |
| } |
| |
| if (outputStream != null) { |
| try { |
| outputStream.flush(); |
| } catch (final IOException e) { |
| final String msg = "Got exception while flushing the output stream"; |
| DebugUtils.handleException(msg, e); |
| } |
| } |
| |
| if (caught != null) { |
| throw caught; |
| } |
| } |
| |
| /** |
| * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout |
| * was exceeded an IOException is created to be thrown to the caller. |
| * |
| * @param thread the thread to be stopped. |
| * @param timeout the time in ms to wait to join. |
| */ |
| private void stop(final Thread thread, final Duration timeout) { |
| if (thread != null) { |
| try { |
| if (timeout.equals(Duration.ZERO)) { |
| thread.join(); |
| } else { |
| final Duration timeToWait = timeout.plus(STOP_TIMEOUT_ADDITION); |
| final Instant startTime = Instant.now(); |
| thread.join(timeToWait.toMillis()); |
| if (Instant.now().isAfter(startTime.plus(timeToWait))) { |
| caught = new ExecuteException("The stop timeout of " + timeout + " ms was exceeded", Executor.INVALID_EXITVALUE); |
| } |
| } |
| } catch (final InterruptedException e) { |
| thread.interrupt(); |
| } |
| } |
| } |
| |
| /** |
| * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout |
| * was exceeded an IOException is created to be thrown to the caller. |
| * |
| * @param thread the thread to be stopped. |
| * @param timeoutMillis the time in ms to wait to join. |
| */ |
| protected void stopThread(final Thread thread, final long timeoutMillis) { |
| stop(thread, Duration.ofMillis(timeoutMillis)); |
| } |
| } |