blob: 2d9326ad2c0c9e3f63a6eb283e4527818014a7b7 [file] [log] [blame]
/*
* Copyright 2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.pedjak.gradle.plugins.dockerizedtest;
import static java.lang.String.format;
import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.command.CreateContainerCmd;
import com.github.dockerjava.api.model.Frame;
import com.github.dockerjava.api.model.StreamType;
import com.github.dockerjava.api.model.WaitResponse;
import com.github.dockerjava.core.command.AttachContainerResultCallback;
import com.github.dockerjava.core.command.WaitContainerResultCallback;
import com.github.dockerjava.api.model.Bind;
import com.github.dockerjava.api.model.Volume;
import com.google.common.base.Joiner;
import groovy.lang.Closure;
import org.gradle.api.logging.Logger;
import org.gradle.api.logging.Logging;
import org.gradle.initialization.BuildCancellationToken;
import org.gradle.internal.UncheckedException;
import org.gradle.internal.event.ListenerBroadcast;
import org.gradle.internal.operations.CurrentBuildOperationPreservingRunnable;
import org.gradle.process.ExecResult;
import org.gradle.process.internal.ExecException;
import org.gradle.process.internal.ExecHandle;
import org.gradle.process.internal.ExecHandleListener;
import org.gradle.process.internal.ExecHandleShutdownHookAction;
import org.gradle.process.internal.ExecHandleState;
import org.gradle.process.internal.ProcessSettings;
import org.gradle.process.internal.StreamsHandler;
import org.gradle.process.internal.shutdown.ShutdownHooks;
/**
* Default implementation for the ExecHandle interface.
*
* <h3>State flows</h3>
*
* <ul>
* <li>INIT -> STARTED -> [SUCCEEDED|FAILED|ABORTED|DETACHED]</li>
* <li>INIT -> FAILED</li>
* <li>INIT -> STARTED -> DETACHED -> ABORTED</li>
* </ul>
*
* State is controlled on all control methods:
* <ul>
* <li>{@link #start()} allowed when state is INIT</li>
* <li>{@link #abort()} allowed when state is STARTED or DETACHED</li>
* </ul>
*/
public class DockerizedExecHandle implements ExecHandle, ProcessSettings {
private static final Logger LOGGER = Logging.getLogger(DockerizedExecHandle.class);
private final String displayName;
/**
* The working directory of the process.
*/
private final File directory;
/**
* The executable to run.
*/
private final String command;
/**
* Arguments to pass to the executable.
*/
private final List<String> arguments;
/**
* The variables to set in the environment the executable is run in.
*/
private final Map<String, String> environment;
private final StreamsHandler outputHandler;
private final StreamsHandler inputHandler;
private final boolean redirectErrorStream;
private int timeoutMillis;
private boolean daemon;
/**
* Lock to guard all mutable state
*/
private final Lock lock;
private final Condition stateChanged;
private final Executor executor;
/**
* State of this ExecHandle.
*/
private ExecHandleState state;
/**
* When not null, the runnable that is waiting
*/
private DockerizedExecHandleRunner execHandleRunner;
private ExecResultImpl execResult;
private final ListenerBroadcast<ExecHandleListener> broadcast;
private final ExecHandleShutdownHookAction shutdownHookAction;
private final BuildCancellationToken buildCancellationToken;
private final DockerizedTestExtension testExtension;
public DockerizedExecHandle(DockerizedTestExtension testExtension, String displayName,
File directory, String command, List<String> arguments,
Map<String, String> environment, StreamsHandler outputHandler,
StreamsHandler inputHandler,
List<ExecHandleListener> listeners, boolean redirectErrorStream,
int timeoutMillis, boolean daemon,
Executor executor, BuildCancellationToken buildCancellationToken) {
this.displayName = displayName;
this.directory = directory;
this.command = command;
this.arguments = arguments;
this.environment = environment;
this.outputHandler = outputHandler;
this.inputHandler = inputHandler;
this.redirectErrorStream = redirectErrorStream;
this.timeoutMillis = timeoutMillis;
this.daemon = daemon;
this.executor = executor;
this.buildCancellationToken = buildCancellationToken;
this.testExtension = testExtension;
lock = new ReentrantLock();
stateChanged = lock.newCondition();
state = ExecHandleState.INIT;
shutdownHookAction = new ExecHandleShutdownHookAction(this);
broadcast = new ListenerBroadcast<>(ExecHandleListener.class);
broadcast.addAll(listeners);
}
@Override
public File getDirectory() {
return directory;
}
@Override
public String getCommand() {
return command;
}
public boolean isDaemon() {
return daemon;
}
@Override
public String toString() {
return displayName;
}
@Override
public List<String> getArguments() {
return Collections.unmodifiableList(arguments);
}
@Override
public Map<String, String> getEnvironment() {
return Collections.unmodifiableMap(environment);
}
@Override
public ExecHandleState getState() {
lock.lock();
try {
return state;
} finally {
lock.unlock();
}
}
private void setState(ExecHandleState state) {
lock.lock();
try {
LOGGER.debug("Changing state to: {}", state);
this.state = state;
stateChanged.signalAll();
} finally {
lock.unlock();
}
}
private boolean stateIn(ExecHandleState... states) {
lock.lock();
try {
return Arrays.asList(states).contains(state);
} finally {
lock.unlock();
}
}
private void setEndStateInfo(ExecHandleState newState, int exitValue, Throwable failureCause) {
ShutdownHooks.removeShutdownHook(shutdownHookAction);
buildCancellationToken.removeCallback(shutdownHookAction);
ExecHandleState currentState;
lock.lock();
try {
currentState = state;
} finally {
lock.unlock();
}
ExecResultImpl
newResult =
new ExecResultImpl(exitValue, execExceptionFor(failureCause, currentState), displayName);
if (!currentState.isTerminal() && newState != ExecHandleState.DETACHED) {
try {
broadcast.getSource().executionFinished(this, newResult);
} catch (Exception e) {
newResult = new ExecResultImpl(exitValue, execExceptionFor(e, currentState), displayName);
}
}
lock.lock();
try {
setState(newState);
execResult = newResult;
} finally {
lock.unlock();
}
LOGGER.debug("Process '{}' finished with exit value {} (state: {})", displayName, exitValue,
newState);
}
@Nullable
private ExecException execExceptionFor(Throwable failureCause, ExecHandleState currentState) {
return failureCause != null
? new ExecException(failureMessageFor(currentState), failureCause)
: null;
}
private String failureMessageFor(ExecHandleState currentState) {
return currentState == ExecHandleState.STARTING
? format("A problem occurred starting process '%s'", displayName)
: format("A problem occurred waiting for process '%s' to complete.", displayName);
}
@Override
public ExecHandle start() {
LOGGER.info("Starting process '{}'. Working directory: {} Command: {}",
displayName, directory, command + ' ' + Joiner.on(' ').useForNull("null").join(arguments));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Environment for process '{}': {}", displayName, environment);
}
lock.lock();
try {
if (!stateIn(ExecHandleState.INIT)) {
throw new IllegalStateException(
format("Cannot start process '%s' because it has already been started", displayName));
}
setState(ExecHandleState.STARTING);
execHandleRunner =
new DockerizedExecHandleRunner(this, new CompositeStreamsHandler(), executor);
executor.execute(new CurrentBuildOperationPreservingRunnable(execHandleRunner));
while (stateIn(ExecHandleState.STARTING)) {
LOGGER.debug("Waiting until process started: {}.", displayName);
try {
if (!stateChanged.await(30, TimeUnit.SECONDS)) {
execHandleRunner.abortProcess();
throw new RuntimeException("Giving up on " + execHandleRunner);
}
} catch (InterruptedException e) {
//ok, wrapping up
}
}
if (execResult != null) {
execResult.rethrowFailure();
}
LOGGER.info("Successfully started process '{}'", displayName);
} finally {
lock.unlock();
}
return this;
}
@Override
public void abort() {
lock.lock();
try {
if (stateIn(ExecHandleState.SUCCEEDED, ExecHandleState.FAILED, ExecHandleState.ABORTED)) {
return;
}
if (!stateIn(ExecHandleState.STARTED, ExecHandleState.DETACHED)) {
throw new IllegalStateException(
format("Cannot abort process '%s' because it is not in started or detached state",
displayName));
}
execHandleRunner.abortProcess();
waitForFinish();
} finally {
lock.unlock();
}
}
@Override
public ExecResult waitForFinish() {
lock.lock();
try {
while (!state.isTerminal()) {
try {
stateChanged.await();
} catch (InterruptedException e) {
//ok, wrapping up...
throw UncheckedException.throwAsUncheckedException(e);
}
}
} finally {
lock.unlock();
}
// At this point:
// If in daemon mode, the process has started successfully and all streams to the process have been closed
// If in fork mode, the process has completed and all cleanup has been done
// In both cases, all asynchronous work for the process has completed and we're done
return result();
}
private ExecResult result() {
lock.lock();
try {
return execResult.rethrowFailure();
} finally {
lock.unlock();
}
}
void detached() {
setEndStateInfo(ExecHandleState.DETACHED, 0, null);
}
void started() {
ShutdownHooks.addShutdownHook(shutdownHookAction);
buildCancellationToken.addCallback(shutdownHookAction);
setState(ExecHandleState.STARTED);
broadcast.getSource().executionStarted(this);
}
void finished(int exitCode) {
if (exitCode != 0) {
setEndStateInfo(ExecHandleState.FAILED, exitCode, null);
} else {
setEndStateInfo(ExecHandleState.SUCCEEDED, 0, null);
}
}
void aborted(int exitCode) {
if (exitCode == 0) {
// This can happen on Windows
exitCode = -1;
}
setEndStateInfo(ExecHandleState.ABORTED, exitCode, null);
}
void failed(Throwable failureCause) {
setEndStateInfo(ExecHandleState.FAILED, -1, failureCause);
}
@Override
public void addListener(ExecHandleListener listener) {
broadcast.add(listener);
}
@Override
public void removeListener(ExecHandleListener listener) {
broadcast.remove(listener);
}
public String getDisplayName() {
return displayName;
}
@Override
public boolean getRedirectErrorStream() {
return redirectErrorStream;
}
public int getTimeout() {
return timeoutMillis;
}
public Process runContainer() {
try {
DockerClient client = testExtension.getClient();
CreateContainerCmd createCmd = client.createContainerCmd(testExtension.getImage())
.withTty(false)
.withStdinOpen(true)
.withWorkingDir(directory.getAbsolutePath());
createCmd.withEnv(getEnv());
String user = testExtension.getUser();
if (user != null) {
createCmd.withUser(user);
}
bindVolumes(createCmd);
List<String> cmdLine = new ArrayList<>();
cmdLine.add(command);
cmdLine.addAll(arguments);
createCmd.withCmd(cmdLine);
invokeIfNotNull(testExtension.getBeforeContainerCreate(), createCmd, client);
String containerId = createCmd.exec().getId();
invokeIfNotNull(testExtension.getAfterContainerCreate(), containerId, client);
invokeIfNotNull(testExtension.getBeforeContainerStart(), containerId, client);
client.startContainerCmd(containerId).exec();
invokeIfNotNull(testExtension.getAfterContainerStart(), containerId, client);
if (!client.inspectContainerCmd(containerId).exec().getState().getRunning()) {
throw new RuntimeException("Container " + containerId + " not running!");
}
Process
proc =
new DockerizedProcess(client, containerId, testExtension.getAfterContainerStop());
return proc;
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
private void invokeIfNotNull(Closure closure, Object... args) {
if (closure != null) {
int l = closure.getParameterTypes().length;
Object[] nargs;
if (l < args.length) {
nargs = new Object[l];
System.arraycopy(args, 0, nargs, 0, l);
} else {
nargs = args;
}
closure.call(nargs);
}
}
private List<String> getEnv() {
List<String> env = new ArrayList<>();
for (Map.Entry<String, String> e : environment.entrySet()) {
env.add(e.getKey() + "=" + e.getValue());
}
return env;
}
private void bindVolumes(CreateContainerCmd cmd) {
List<Volume> volumes = new ArrayList<>();
List<Bind> binds = new ArrayList<>();
for (Object o : testExtension.getVolumes().entrySet()) {
@SuppressWarnings("unchecked")
Map.Entry<Object, Object> e = (Map.Entry<Object, Object>) o;
Volume volume = new Volume(e.getValue().toString());
Bind bind = new Bind(e.getKey().toString(), volume);
binds.add(bind);
volumes.add(volume);
}
cmd.withVolumes(volumes).withBinds(binds);
}
private static class ExecResultImpl implements ExecResult {
private final int exitValue;
private final ExecException failure;
private final String displayName;
ExecResultImpl(int exitValue, ExecException failure, String displayName) {
this.exitValue = exitValue;
this.failure = failure;
this.displayName = displayName;
}
@Override
public int getExitValue() {
return exitValue;
}
@Override
public ExecResult assertNormalExitValue() throws ExecException {
// all exit values are ok
// if (exitValue != 0) {
// throw new ExecException(format("Process '%s' finished with non-zero exit value %d", displayName, exitValue));
// }
return this;
}
@Override
public ExecResult rethrowFailure() throws ExecException {
if (failure != null) {
throw failure;
}
return this;
}
@Override
public String toString() {
return "{exitValue=" + exitValue + ", failure=" + failure + "}";
}
}
private class CompositeStreamsHandler implements StreamsHandler {
@Override
public void connectStreams(Process process, String processName, Executor executor) {
inputHandler.connectStreams(process, processName, executor);
outputHandler.connectStreams(process, processName, executor);
}
@Override
public void start() {
inputHandler.start();
outputHandler.start();
}
@Override
public void stop() {
inputHandler.stop();
outputHandler.stop();
}
@Override
public void disconnect() {
inputHandler.disconnect();
outputHandler.disconnect();
}
}
private class DockerizedProcess extends Process {
private final DockerClient dockerClient;
private final String containerId;
private final Closure afterContainerStop;
private final PipedOutputStream stdInWriteStream = new PipedOutputStream();
private final PipedInputStream stdOutReadStream = new PipedInputStream();
private final PipedInputStream stdErrReadStream = new PipedInputStream();
private final PipedInputStream stdInReadStream = new PipedInputStream(stdInWriteStream);
private final PipedOutputStream stdOutWriteStream = new PipedOutputStream(stdOutReadStream);
private final PipedOutputStream stdErrWriteStream = new PipedOutputStream(stdErrReadStream);
private final CountDownLatch finished = new CountDownLatch(1);
private AtomicInteger exitCode = new AtomicInteger();
private final AttachContainerResultCallback
attachContainerResultCallback =
new AttachContainerResultCallback() {
@Override
public void onNext(Frame frame) {
try {
if (frame.getStreamType().equals(StreamType.STDOUT)) {
stdOutWriteStream.write(frame.getPayload());
} else if (frame.getStreamType().equals(StreamType.STDERR)) {
stdErrWriteStream.write(frame.getPayload());
}
} catch (Exception e) {
LOGGER.error("Error while writing to stream:", e);
}
super.onNext(frame);
}
};
private final WaitContainerResultCallback
waitContainerResultCallback =
new WaitContainerResultCallback() {
@Override
public void onNext(WaitResponse waitResponse) {
exitCode.set(waitResponse.getStatusCode());
try {
attachContainerResultCallback.close();
attachContainerResultCallback.awaitCompletion();
stdOutWriteStream.close();
stdErrWriteStream.close();
} catch (Exception e) {
LOGGER.debug("Error by detaching streams", e);
} finally {
try {
invokeIfNotNull(afterContainerStop, containerId, dockerClient);
} catch (Exception e) {
LOGGER.debug("Exception thrown at invoking afterContainerStop", e);
} finally {
finished.countDown();
}
}
}
};
public DockerizedProcess(final DockerClient dockerClient, final String containerId,
final Closure afterContainerStop) throws Exception {
this.dockerClient = dockerClient;
this.containerId = containerId;
this.afterContainerStop = afterContainerStop;
attachStreams();
dockerClient.waitContainerCmd(containerId).exec(waitContainerResultCallback);
}
private void attachStreams() throws Exception {
dockerClient.attachContainerCmd(containerId)
.withFollowStream(true)
.withStdOut(true)
.withStdErr(true)
.withStdIn(stdInReadStream)
.exec(attachContainerResultCallback);
if (!attachContainerResultCallback.awaitStarted(10, TimeUnit.SECONDS)) {
LOGGER.warn("Not attached to container " + containerId + " within 10secs");
throw new RuntimeException("Not attached to container " + containerId + " within 10secs");
}
}
@Override
public OutputStream getOutputStream() {
return stdInWriteStream;
}
@Override
public InputStream getInputStream() {
return stdOutReadStream;
}
@Override
public InputStream getErrorStream() {
return stdErrReadStream;
}
@Override
public int waitFor() throws InterruptedException {
finished.await();
return exitCode.get();
}
@Override
public int exitValue() {
if (finished.getCount() > 0) {
throw new IllegalThreadStateException("docker process still running");
}
return exitCode.get();
}
@Override
public void destroy() {
dockerClient.killContainerCmd(containerId).exec();
}
@Override
public String toString() {
return "Container " + containerId + " on " + dockerClient.toString();
}
}
}