| /* |
| * Copyright 2010 the original author or authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package com.pedjak.gradle.plugins.dockerizedtest; |
| |
| import static java.lang.String.format; |
| |
| import java.io.File; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.io.PipedInputStream; |
| import java.io.PipedOutputStream; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import javax.annotation.Nullable; |
| |
| import com.github.dockerjava.api.DockerClient; |
| import com.github.dockerjava.api.command.CreateContainerCmd; |
| import com.github.dockerjava.api.model.Frame; |
| import com.github.dockerjava.api.model.StreamType; |
| import com.github.dockerjava.api.model.WaitResponse; |
| import com.github.dockerjava.core.command.AttachContainerResultCallback; |
| import com.github.dockerjava.core.command.WaitContainerResultCallback; |
| import com.github.dockerjava.api.model.Bind; |
| import com.github.dockerjava.api.model.Volume; |
| import com.google.common.base.Joiner; |
| import groovy.lang.Closure; |
| import org.gradle.api.logging.Logger; |
| import org.gradle.api.logging.Logging; |
| import org.gradle.initialization.BuildCancellationToken; |
| import org.gradle.internal.UncheckedException; |
| import org.gradle.internal.event.ListenerBroadcast; |
| import org.gradle.internal.operations.CurrentBuildOperationPreservingRunnable; |
| import org.gradle.process.ExecResult; |
| import org.gradle.process.internal.ExecException; |
| import org.gradle.process.internal.ExecHandle; |
| import org.gradle.process.internal.ExecHandleListener; |
| import org.gradle.process.internal.ExecHandleShutdownHookAction; |
| import org.gradle.process.internal.ExecHandleState; |
| import org.gradle.process.internal.ProcessSettings; |
| import org.gradle.process.internal.StreamsHandler; |
| import org.gradle.process.internal.shutdown.ShutdownHooks; |
| |
| /** |
| * Default implementation for the ExecHandle interface. |
| * |
| * <h3>State flows</h3> |
| * |
| * <ul> |
| * <li>INIT -> STARTED -> [SUCCEEDED|FAILED|ABORTED|DETACHED]</li> |
| * <li>INIT -> FAILED</li> |
| * <li>INIT -> STARTED -> DETACHED -> ABORTED</li> |
| * </ul> |
| * |
| * State is controlled on all control methods: |
| * <ul> |
| * <li>{@link #start()} allowed when state is INIT</li> |
| * <li>{@link #abort()} allowed when state is STARTED or DETACHED</li> |
| * </ul> |
| */ |
| public class DockerizedExecHandle implements ExecHandle, ProcessSettings { |
| |
| private static final Logger LOGGER = Logging.getLogger(DockerizedExecHandle.class); |
| |
| private final String displayName; |
| |
| /** |
| * The working directory of the process. |
| */ |
| private final File directory; |
| |
| /** |
| * The executable to run. |
| */ |
| private final String command; |
| |
| /** |
| * Arguments to pass to the executable. |
| */ |
| private final List<String> arguments; |
| |
| /** |
| * The variables to set in the environment the executable is run in. |
| */ |
| private final Map<String, String> environment; |
| private final StreamsHandler outputHandler; |
| private final StreamsHandler inputHandler; |
| private final boolean redirectErrorStream; |
| private int timeoutMillis; |
| private boolean daemon; |
| |
| /** |
| * Lock to guard all mutable state |
| */ |
| private final Lock lock; |
| private final Condition stateChanged; |
| |
| private final Executor executor; |
| |
| /** |
| * State of this ExecHandle. |
| */ |
| private ExecHandleState state; |
| |
| /** |
| * When not null, the runnable that is waiting |
| */ |
| private DockerizedExecHandleRunner execHandleRunner; |
| |
| private ExecResultImpl execResult; |
| |
| private final ListenerBroadcast<ExecHandleListener> broadcast; |
| |
| private final ExecHandleShutdownHookAction shutdownHookAction; |
| |
| private final BuildCancellationToken buildCancellationToken; |
| |
| private final DockerizedTestExtension testExtension; |
| |
| public DockerizedExecHandle(DockerizedTestExtension testExtension, String displayName, |
| File directory, String command, List<String> arguments, |
| Map<String, String> environment, StreamsHandler outputHandler, |
| StreamsHandler inputHandler, |
| List<ExecHandleListener> listeners, boolean redirectErrorStream, |
| int timeoutMillis, boolean daemon, |
| Executor executor, BuildCancellationToken buildCancellationToken) { |
| this.displayName = displayName; |
| this.directory = directory; |
| this.command = command; |
| this.arguments = arguments; |
| this.environment = environment; |
| this.outputHandler = outputHandler; |
| this.inputHandler = inputHandler; |
| this.redirectErrorStream = redirectErrorStream; |
| this.timeoutMillis = timeoutMillis; |
| this.daemon = daemon; |
| this.executor = executor; |
| this.buildCancellationToken = buildCancellationToken; |
| this.testExtension = testExtension; |
| lock = new ReentrantLock(); |
| stateChanged = lock.newCondition(); |
| state = ExecHandleState.INIT; |
| shutdownHookAction = new ExecHandleShutdownHookAction(this); |
| broadcast = new ListenerBroadcast<>(ExecHandleListener.class); |
| broadcast.addAll(listeners); |
| } |
| |
| @Override |
| public File getDirectory() { |
| return directory; |
| } |
| |
| @Override |
| public String getCommand() { |
| return command; |
| } |
| |
| public boolean isDaemon() { |
| return daemon; |
| } |
| |
| @Override |
| public String toString() { |
| return displayName; |
| } |
| |
| @Override |
| public List<String> getArguments() { |
| return Collections.unmodifiableList(arguments); |
| } |
| |
| @Override |
| public Map<String, String> getEnvironment() { |
| return Collections.unmodifiableMap(environment); |
| } |
| |
| @Override |
| public ExecHandleState getState() { |
| lock.lock(); |
| try { |
| return state; |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| private void setState(ExecHandleState state) { |
| lock.lock(); |
| try { |
| LOGGER.debug("Changing state to: {}", state); |
| this.state = state; |
| stateChanged.signalAll(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| private boolean stateIn(ExecHandleState... states) { |
| lock.lock(); |
| try { |
| return Arrays.asList(states).contains(state); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| private void setEndStateInfo(ExecHandleState newState, int exitValue, Throwable failureCause) { |
| ShutdownHooks.removeShutdownHook(shutdownHookAction); |
| buildCancellationToken.removeCallback(shutdownHookAction); |
| ExecHandleState currentState; |
| lock.lock(); |
| try { |
| currentState = state; |
| } finally { |
| lock.unlock(); |
| } |
| |
| ExecResultImpl |
| newResult = |
| new ExecResultImpl(exitValue, execExceptionFor(failureCause, currentState), displayName); |
| if (!currentState.isTerminal() && newState != ExecHandleState.DETACHED) { |
| try { |
| broadcast.getSource().executionFinished(this, newResult); |
| } catch (Exception e) { |
| newResult = new ExecResultImpl(exitValue, execExceptionFor(e, currentState), displayName); |
| } |
| } |
| |
| lock.lock(); |
| try { |
| setState(newState); |
| execResult = newResult; |
| } finally { |
| lock.unlock(); |
| } |
| |
| LOGGER.debug("Process '{}' finished with exit value {} (state: {})", displayName, exitValue, |
| newState); |
| } |
| |
| @Nullable |
| private ExecException execExceptionFor(Throwable failureCause, ExecHandleState currentState) { |
| return failureCause != null |
| ? new ExecException(failureMessageFor(currentState), failureCause) |
| : null; |
| } |
| |
| private String failureMessageFor(ExecHandleState currentState) { |
| return currentState == ExecHandleState.STARTING |
| ? format("A problem occurred starting process '%s'", displayName) |
| : format("A problem occurred waiting for process '%s' to complete.", displayName); |
| } |
| |
| @Override |
| public ExecHandle start() { |
| LOGGER.info("Starting process '{}'. Working directory: {} Command: {}", |
| displayName, directory, command + ' ' + Joiner.on(' ').useForNull("null").join(arguments)); |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug("Environment for process '{}': {}", displayName, environment); |
| } |
| lock.lock(); |
| try { |
| if (!stateIn(ExecHandleState.INIT)) { |
| throw new IllegalStateException( |
| format("Cannot start process '%s' because it has already been started", displayName)); |
| } |
| setState(ExecHandleState.STARTING); |
| |
| execHandleRunner = |
| new DockerizedExecHandleRunner(this, new CompositeStreamsHandler(), executor); |
| executor.execute(new CurrentBuildOperationPreservingRunnable(execHandleRunner)); |
| |
| while (stateIn(ExecHandleState.STARTING)) { |
| LOGGER.debug("Waiting until process started: {}.", displayName); |
| try { |
| if (!stateChanged.await(30, TimeUnit.SECONDS)) { |
| execHandleRunner.abortProcess(); |
| throw new RuntimeException("Giving up on " + execHandleRunner); |
| } |
| } catch (InterruptedException e) { |
| //ok, wrapping up |
| } |
| } |
| |
| if (execResult != null) { |
| execResult.rethrowFailure(); |
| } |
| |
| LOGGER.info("Successfully started process '{}'", displayName); |
| } finally { |
| lock.unlock(); |
| } |
| return this; |
| } |
| |
| @Override |
| public void abort() { |
| lock.lock(); |
| try { |
| if (stateIn(ExecHandleState.SUCCEEDED, ExecHandleState.FAILED, ExecHandleState.ABORTED)) { |
| return; |
| } |
| if (!stateIn(ExecHandleState.STARTED, ExecHandleState.DETACHED)) { |
| throw new IllegalStateException( |
| format("Cannot abort process '%s' because it is not in started or detached state", |
| displayName)); |
| } |
| execHandleRunner.abortProcess(); |
| waitForFinish(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| @Override |
| public ExecResult waitForFinish() { |
| lock.lock(); |
| try { |
| while (!state.isTerminal()) { |
| try { |
| stateChanged.await(); |
| } catch (InterruptedException e) { |
| //ok, wrapping up... |
| throw UncheckedException.throwAsUncheckedException(e); |
| } |
| } |
| } finally { |
| lock.unlock(); |
| } |
| |
| // At this point: |
| // If in daemon mode, the process has started successfully and all streams to the process have been closed |
| // If in fork mode, the process has completed and all cleanup has been done |
| // In both cases, all asynchronous work for the process has completed and we're done |
| |
| return result(); |
| } |
| |
| private ExecResult result() { |
| lock.lock(); |
| try { |
| return execResult.rethrowFailure(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| void detached() { |
| setEndStateInfo(ExecHandleState.DETACHED, 0, null); |
| } |
| |
| void started() { |
| ShutdownHooks.addShutdownHook(shutdownHookAction); |
| buildCancellationToken.addCallback(shutdownHookAction); |
| setState(ExecHandleState.STARTED); |
| broadcast.getSource().executionStarted(this); |
| } |
| |
| void finished(int exitCode) { |
| if (exitCode != 0) { |
| setEndStateInfo(ExecHandleState.FAILED, exitCode, null); |
| } else { |
| setEndStateInfo(ExecHandleState.SUCCEEDED, 0, null); |
| } |
| } |
| |
| void aborted(int exitCode) { |
| if (exitCode == 0) { |
| // This can happen on Windows |
| exitCode = -1; |
| } |
| setEndStateInfo(ExecHandleState.ABORTED, exitCode, null); |
| } |
| |
| void failed(Throwable failureCause) { |
| setEndStateInfo(ExecHandleState.FAILED, -1, failureCause); |
| } |
| |
| @Override |
| public void addListener(ExecHandleListener listener) { |
| broadcast.add(listener); |
| } |
| |
| @Override |
| public void removeListener(ExecHandleListener listener) { |
| broadcast.remove(listener); |
| } |
| |
| public String getDisplayName() { |
| return displayName; |
| } |
| |
| @Override |
| public boolean getRedirectErrorStream() { |
| return redirectErrorStream; |
| } |
| |
| public int getTimeout() { |
| return timeoutMillis; |
| } |
| |
| public Process runContainer() { |
| try { |
| DockerClient client = testExtension.getClient(); |
| CreateContainerCmd createCmd = client.createContainerCmd(testExtension.getImage()) |
| .withTty(false) |
| .withStdinOpen(true) |
| .withWorkingDir(directory.getAbsolutePath()); |
| |
| createCmd.withEnv(getEnv()); |
| |
| String user = testExtension.getUser(); |
| if (user != null) { |
| createCmd.withUser(user); |
| } |
| bindVolumes(createCmd); |
| List<String> cmdLine = new ArrayList<>(); |
| cmdLine.add(command); |
| cmdLine.addAll(arguments); |
| createCmd.withCmd(cmdLine); |
| |
| invokeIfNotNull(testExtension.getBeforeContainerCreate(), createCmd, client); |
| String containerId = createCmd.exec().getId(); |
| invokeIfNotNull(testExtension.getAfterContainerCreate(), containerId, client); |
| |
| invokeIfNotNull(testExtension.getBeforeContainerStart(), containerId, client); |
| client.startContainerCmd(containerId).exec(); |
| invokeIfNotNull(testExtension.getAfterContainerStart(), containerId, client); |
| |
| if (!client.inspectContainerCmd(containerId).exec().getState().getRunning()) { |
| throw new RuntimeException("Container " + containerId + " not running!"); |
| } |
| |
| Process |
| proc = |
| new DockerizedProcess(client, containerId, testExtension.getAfterContainerStop()); |
| |
| return proc; |
| } catch (Exception e) { |
| e.printStackTrace(); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| private void invokeIfNotNull(Closure closure, Object... args) { |
| if (closure != null) { |
| int l = closure.getParameterTypes().length; |
| Object[] nargs; |
| if (l < args.length) { |
| nargs = new Object[l]; |
| System.arraycopy(args, 0, nargs, 0, l); |
| } else { |
| nargs = args; |
| } |
| closure.call(nargs); |
| } |
| } |
| |
| private List<String> getEnv() { |
| List<String> env = new ArrayList<>(); |
| for (Map.Entry<String, String> e : environment.entrySet()) { |
| env.add(e.getKey() + "=" + e.getValue()); |
| } |
| return env; |
| } |
| |
| private void bindVolumes(CreateContainerCmd cmd) { |
| List<Volume> volumes = new ArrayList<>(); |
| List<Bind> binds = new ArrayList<>(); |
| for (Object o : testExtension.getVolumes().entrySet()) { |
| @SuppressWarnings("unchecked") |
| Map.Entry<Object, Object> e = (Map.Entry<Object, Object>) o; |
| Volume volume = new Volume(e.getValue().toString()); |
| Bind bind = new Bind(e.getKey().toString(), volume); |
| binds.add(bind); |
| volumes.add(volume); |
| } |
| cmd.withVolumes(volumes).withBinds(binds); |
| } |
| |
| private static class ExecResultImpl implements ExecResult { |
| private final int exitValue; |
| private final ExecException failure; |
| private final String displayName; |
| |
| ExecResultImpl(int exitValue, ExecException failure, String displayName) { |
| this.exitValue = exitValue; |
| this.failure = failure; |
| this.displayName = displayName; |
| } |
| |
| @Override |
| public int getExitValue() { |
| return exitValue; |
| } |
| |
| @Override |
| public ExecResult assertNormalExitValue() throws ExecException { |
| // all exit values are ok |
| // if (exitValue != 0) { |
| // throw new ExecException(format("Process '%s' finished with non-zero exit value %d", displayName, exitValue)); |
| // } |
| return this; |
| } |
| |
| @Override |
| public ExecResult rethrowFailure() throws ExecException { |
| if (failure != null) { |
| throw failure; |
| } |
| return this; |
| } |
| |
| @Override |
| public String toString() { |
| return "{exitValue=" + exitValue + ", failure=" + failure + "}"; |
| } |
| } |
| |
| private class CompositeStreamsHandler implements StreamsHandler { |
| @Override |
| public void connectStreams(Process process, String processName, Executor executor) { |
| inputHandler.connectStreams(process, processName, executor); |
| outputHandler.connectStreams(process, processName, executor); |
| } |
| |
| @Override |
| public void start() { |
| inputHandler.start(); |
| outputHandler.start(); |
| } |
| |
| @Override |
| public void stop() { |
| inputHandler.stop(); |
| outputHandler.stop(); |
| } |
| |
| @Override |
| public void disconnect() { |
| inputHandler.disconnect(); |
| outputHandler.disconnect(); |
| } |
| } |
| |
| private class DockerizedProcess extends Process { |
| |
| private final DockerClient dockerClient; |
| private final String containerId; |
| private final Closure afterContainerStop; |
| |
| private final PipedOutputStream stdInWriteStream = new PipedOutputStream(); |
| private final PipedInputStream stdOutReadStream = new PipedInputStream(); |
| private final PipedInputStream stdErrReadStream = new PipedInputStream(); |
| private final PipedInputStream stdInReadStream = new PipedInputStream(stdInWriteStream); |
| private final PipedOutputStream stdOutWriteStream = new PipedOutputStream(stdOutReadStream); |
| private final PipedOutputStream stdErrWriteStream = new PipedOutputStream(stdErrReadStream); |
| |
| private final CountDownLatch finished = new CountDownLatch(1); |
| private AtomicInteger exitCode = new AtomicInteger(); |
| private final AttachContainerResultCallback |
| attachContainerResultCallback = |
| new AttachContainerResultCallback() { |
| @Override |
| public void onNext(Frame frame) { |
| try { |
| if (frame.getStreamType().equals(StreamType.STDOUT)) { |
| stdOutWriteStream.write(frame.getPayload()); |
| } else if (frame.getStreamType().equals(StreamType.STDERR)) { |
| stdErrWriteStream.write(frame.getPayload()); |
| } |
| } catch (Exception e) { |
| LOGGER.error("Error while writing to stream:", e); |
| } |
| super.onNext(frame); |
| } |
| }; |
| |
| private final WaitContainerResultCallback |
| waitContainerResultCallback = |
| new WaitContainerResultCallback() { |
| @Override |
| public void onNext(WaitResponse waitResponse) { |
| exitCode.set(waitResponse.getStatusCode()); |
| try { |
| attachContainerResultCallback.close(); |
| attachContainerResultCallback.awaitCompletion(); |
| stdOutWriteStream.close(); |
| stdErrWriteStream.close(); |
| } catch (Exception e) { |
| LOGGER.debug("Error by detaching streams", e); |
| } finally { |
| try { |
| invokeIfNotNull(afterContainerStop, containerId, dockerClient); |
| } catch (Exception e) { |
| LOGGER.debug("Exception thrown at invoking afterContainerStop", e); |
| } finally { |
| finished.countDown(); |
| } |
| |
| } |
| |
| |
| } |
| }; |
| |
| public DockerizedProcess(final DockerClient dockerClient, final String containerId, |
| final Closure afterContainerStop) throws Exception { |
| this.dockerClient = dockerClient; |
| this.containerId = containerId; |
| this.afterContainerStop = afterContainerStop; |
| attachStreams(); |
| dockerClient.waitContainerCmd(containerId).exec(waitContainerResultCallback); |
| } |
| |
| private void attachStreams() throws Exception { |
| dockerClient.attachContainerCmd(containerId) |
| .withFollowStream(true) |
| .withStdOut(true) |
| .withStdErr(true) |
| .withStdIn(stdInReadStream) |
| .exec(attachContainerResultCallback); |
| if (!attachContainerResultCallback.awaitStarted(10, TimeUnit.SECONDS)) { |
| LOGGER.warn("Not attached to container " + containerId + " within 10secs"); |
| throw new RuntimeException("Not attached to container " + containerId + " within 10secs"); |
| } |
| } |
| |
| @Override |
| public OutputStream getOutputStream() { |
| return stdInWriteStream; |
| } |
| |
| @Override |
| public InputStream getInputStream() { |
| return stdOutReadStream; |
| } |
| |
| @Override |
| public InputStream getErrorStream() { |
| return stdErrReadStream; |
| } |
| |
| @Override |
| public int waitFor() throws InterruptedException { |
| finished.await(); |
| return exitCode.get(); |
| } |
| |
| @Override |
| public int exitValue() { |
| if (finished.getCount() > 0) { |
| throw new IllegalThreadStateException("docker process still running"); |
| } |
| return exitCode.get(); |
| } |
| |
| @Override |
| public void destroy() { |
| dockerClient.killContainerCmd(containerId).exec(); |
| } |
| |
| @Override |
| public String toString() { |
| return "Container " + containerId + " on " + dockerClient.toString(); |
| } |
| } |
| |
| } |