blob: 8fe8e335d90354362c6fd4a4c47fe0096916b39c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.utils;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.async.ResultCallback;
import com.github.dockerjava.api.command.InspectContainerResponse;
import com.github.dockerjava.api.command.InspectExecResponse;
import com.github.dockerjava.api.exception.NotFoundException;
import com.github.dockerjava.api.model.ContainerNetwork;
import com.github.dockerjava.api.model.Frame;
import com.github.dockerjava.api.model.StreamType;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.docker.ContainerExecResultBytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.zip.GZIPOutputStream;
import static java.nio.charset.StandardCharsets.UTF_8;
public class DockerUtils {
private static final Logger LOG = LoggerFactory.getLogger(DockerUtils.class);
private static File getTargetDirectory(String containerId) {
String base = System.getProperty("maven.buildDirectory");
if (base == null) {
base = "target";
}
File directory = new File(base + "/container-logs/" + containerId);
if (!directory.exists() && !directory.mkdirs()) {
LOG.error("Error creating directory for container logs.");
}
return directory;
}
public static void dumpContainerLogToTarget(DockerClient dockerClient, String containerId) {
final InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
// docker api returns names prefixed with "/", it's part of it's legacy design,
// this removes it to be consistent with what docker ps shows.
final String containerName = inspectContainerResponse.getName().replace("/","");
File output = new File(getTargetDirectory(containerName), "docker.log");
int i = 0;
while (output.exists()) {
LOG.info("{} exists, incrementing", output);
output = new File(getTargetDirectory(containerName), "docker." + i++ + ".log");
}
try (FileOutputStream os = new FileOutputStream(output)) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
dockerClient.logContainerCmd(containerName).withStdOut(true)
.withStdErr(true).withTimestamps(true).exec(new ResultCallback<Frame>() {
@Override
public void close() {}
@Override
public void onStart(Closeable closeable) {}
@Override
public void onNext(Frame object) {
try {
os.write(object.getPayload());
} catch (IOException e) {
onError(e);
}
}
@Override
public void onError(Throwable throwable) {
future.completeExceptionally(throwable);
}
@Override
public void onComplete() {
future.complete(true);
}
});
future.get();
} catch (RuntimeException|ExecutionException|IOException e) {
LOG.error("Error dumping log for {}", containerName, e);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.info("Interrupted dumping log from container {}", containerName, ie);
}
}
public static void dumpContainerDirToTargetCompressed(DockerClient dockerClient, String containerId,
String path) {
final int READ_BLOCK_SIZE = 10000;
final InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
// docker api returns names prefixed with "/", it's part of it's legacy design,
// this removes it to be consistent with what docker ps shows.
final String containerName = inspectContainerResponse.getName().replace("/","");
final String baseName = path.replace("/", "-").replaceAll("^-", "");
File output = new File(getTargetDirectory(containerName), baseName + ".tar.gz");
int i = 0;
while (output.exists()) {
LOG.info("{} exists, incrementing", output);
output = new File(getTargetDirectory(containerName), baseName + "_" + i++ + ".tar.gz");
}
try (InputStream dockerStream = dockerClient.copyArchiveFromContainerCmd(containerId, path).exec();
OutputStream os = new GZIPOutputStream(new FileOutputStream(output))) {
byte[] block = new byte[READ_BLOCK_SIZE];
int read = dockerStream.read(block, 0, READ_BLOCK_SIZE);
while (read > -1) {
os.write(block, 0, read);
read = dockerStream.read(block, 0, READ_BLOCK_SIZE);
}
} catch (RuntimeException|IOException e) {
if (!(e instanceof NotFoundException)) {
LOG.error("Error reading dir from container {}", containerName, e);
}
}
}
public static void dumpContainerLogDirToTarget(DockerClient docker, String containerId,
String path) {
final int READ_BLOCK_SIZE = 10000;
try (InputStream dockerStream = docker.copyArchiveFromContainerCmd(containerId, path).exec();
TarArchiveInputStream stream = new TarArchiveInputStream(dockerStream)) {
TarArchiveEntry entry = stream.getNextTarEntry();
while (entry != null) {
if (entry.isFile()) {
File output = new File(getTargetDirectory(containerId), entry.getName().replace("/", "-"));
try (FileOutputStream os = new FileOutputStream(output)) {
byte[] block = new byte[READ_BLOCK_SIZE];
int read = stream.read(block, 0, READ_BLOCK_SIZE);
while (read > -1) {
os.write(block, 0, read);
read = stream.read(block, 0, READ_BLOCK_SIZE);
}
}
}
entry = stream.getNextTarEntry();
}
} catch (RuntimeException|IOException e) {
LOG.error("Error reading logs from container {}", containerId, e);
}
}
public static String getContainerIP(DockerClient docker, String containerId) {
for (Map.Entry<String, ContainerNetwork> e : docker.inspectContainerCmd(containerId)
.exec().getNetworkSettings().getNetworks().entrySet()) {
return e.getValue().getIpAddress();
}
throw new IllegalArgumentException("Container " + containerId + " has no networks");
}
public static ContainerExecResult runCommand(DockerClient docker,
String containerId,
String... cmd)
throws ContainerExecException, ExecutionException, InterruptedException {
try {
return runCommandAsync(docker, containerId, cmd).get();
} catch (ExecutionException e) {
if (e.getCause() instanceof ContainerExecException) {
throw (ContainerExecException) e.getCause();
}
throw e;
}
}
public static CompletableFuture<ContainerExecResult> runCommandAsync(DockerClient dockerClient,
String containerId,
String... cmd) {
CompletableFuture<ContainerExecResult> future = new CompletableFuture<>();
String execId = dockerClient.execCreateCmd(containerId)
.withCmd(cmd)
.withAttachStderr(true)
.withAttachStdout(true)
.exec()
.getId();
final InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
final String containerName = inspectContainerResponse.getName().replace("/","");
String cmdString = String.join(" ", cmd);
StringBuilder stdout = new StringBuilder();
StringBuilder stderr = new StringBuilder();
dockerClient.execStartCmd(execId).withDetach(false)
.exec(new ResultCallback<Frame>() {
@Override
public void close() {}
@Override
public void onStart(Closeable closeable) {
LOG.info("DOCKER.exec({}:{}): Executing...", containerName, cmdString);
}
@Override
public void onNext(Frame object) {
LOG.info("DOCKER.exec({}:{}): {}", containerName, cmdString, object);
if (StreamType.STDOUT == object.getStreamType()) {
stdout.append(new String(object.getPayload(), UTF_8));
} else if (StreamType.STDERR == object.getStreamType()) {
stderr.append(new String(object.getPayload(), UTF_8));
}
}
@Override
public void onError(Throwable throwable) {
future.completeExceptionally(throwable);
}
@Override
public void onComplete() {
LOG.info("DOCKER.exec({}:{}): Done", containerName, cmdString);
InspectExecResponse resp = dockerClient.inspectExecCmd(execId).exec();
while (resp.isRunning()) {
try {
Thread.sleep(200);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
resp = dockerClient.inspectExecCmd(execId).exec();
}
int retCode = resp.getExitCode();
ContainerExecResult result = ContainerExecResult.of(
retCode,
stdout.toString(),
stderr.toString()
);
LOG.info("DOCKER.exec({}:{}): completed with {}", containerName, cmdString, retCode);
if (retCode != 0) {
LOG.error("DOCKER.exec({}:{}): completed with non zero return code: {}\nstdout: {}\nstderr: {}",
containerName, cmdString, result.getExitCode(), result.getStdout(), result.getStderr());
future.completeExceptionally(new ContainerExecException(cmdString, containerId, result));
} else {
future.complete(result);
}
}
});
return future;
}
public static ContainerExecResultBytes runCommandWithRawOutput(DockerClient dockerClient,
String containerId,
String... cmd) throws ContainerExecException {
CompletableFuture<Boolean> future = new CompletableFuture<>();
String execId = dockerClient.execCreateCmd(containerId)
.withCmd(cmd)
.withAttachStderr(true)
.withAttachStdout(true)
.exec()
.getId();
final InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
final String containerName = inspectContainerResponse.getName().replace("/","");
String cmdString = String.join(" ", cmd);
ByteBuf stdout = Unpooled.buffer();
ByteBuf stderr = Unpooled.buffer();
dockerClient.execStartCmd(execId).withDetach(false)
.exec(new ResultCallback<Frame>() {
@Override
public void close() {
}
@Override
public void onStart(Closeable closeable) {
LOG.info("DOCKER.exec({}:{}): Executing...", containerName, cmdString);
}
@Override
public void onNext(Frame object) {
if (StreamType.STDOUT == object.getStreamType()) {
stdout.writeBytes(object.getPayload());
} else if (StreamType.STDERR == object.getStreamType()) {
stderr.writeBytes(object.getPayload());
}
}
@Override
public void onError(Throwable throwable) {
future.completeExceptionally(throwable);
}
@Override
public void onComplete() {
LOG.info("DOCKER.exec({}:{}): Done", containerName, cmdString);
future.complete(true);
}
});
future.join();
InspectExecResponse resp = dockerClient.inspectExecCmd(execId).exec();
while (resp.isRunning()) {
try {
Thread.sleep(200);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
resp = dockerClient.inspectExecCmd(execId).exec();
}
int retCode = resp.getExitCode();
byte[] stdoutBytes = new byte[stdout.readableBytes()];
stdout.readBytes(stdoutBytes);
byte[] stderrBytes = new byte[stderr.readableBytes()];
stderr.readBytes(stderrBytes);
ContainerExecResultBytes result = ContainerExecResultBytes.of(
retCode,
stdoutBytes,
stderrBytes);
LOG.info("DOCKER.exec({}:{}): completed with {}", containerName, cmdString, retCode);
if (retCode != 0) {
throw new ContainerExecException(cmdString, containerId, null);
}
return result;
}
public static Optional<String> getContainerCluster(DockerClient docker, String containerId) {
return Optional.ofNullable(docker.inspectContainerCmd(containerId)
.exec().getConfig().getLabels().get("cluster"));
}
}