blob: 415c1b3e373ef47b33ff72dedaba8eff527cba26 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.tests.integration.utils;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.async.ResultCallback;
import com.github.dockerjava.api.command.InspectContainerResponse;
import com.github.dockerjava.api.command.InspectExecResponse;
import com.github.dockerjava.api.model.ContainerNetwork;
import com.github.dockerjava.api.model.Frame;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Docker utilities for integration tests.
*/
public class DockerUtils {
private static final Logger LOG = LoggerFactory.getLogger(DockerUtils.class);
private static File getTargetDirectory(String containerId) {
String base = System.getProperty("maven.buildDirectory");
if (base == null) {
base = "target";
}
File directory = new File(base + "/container-logs/" + containerId);
if (!directory.exists() && !directory.mkdirs()) {
LOG.error("Error creating directory for container logs.");
}
return directory;
}
public static void dumpContainerLogToTarget(DockerClient docker, String containerId) {
File output = new File(getTargetDirectory(containerId), "docker.log");
try (FileOutputStream os = new FileOutputStream(output)) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
docker.logContainerCmd(containerId).withStdOut(true)
.withStdErr(true).withTimestamps(true).exec(new ResultCallback<Frame>() {
@Override
public void close() {}
@Override
public void onStart(Closeable closeable) {}
@Override
public void onNext(Frame object) {
try {
os.write(object.getPayload());
} catch (IOException e) {
onError(e);
}
}
@Override
public void onError(Throwable throwable) {
future.completeExceptionally(throwable);
}
@Override
public void onComplete() {
future.complete(true);
}
});
future.get();
} catch (RuntimeException | ExecutionException | IOException e) {
LOG.error("Error dumping log for {}", containerId, e);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.info("Interrupted dumping log from container {}", containerId, ie);
}
}
public static void dumpContainerDirToTargetCompressed(DockerClient dockerClient, String containerId,
String path) {
final int readBlockSize = 10000;
InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
// docker api returns names prefixed with "/", it's part of it's legacy design,
// this removes it to be consistent with what docker ps shows.
final String containerName = inspectContainerResponse.getName().replace("/", "");
File output = new File(getTargetDirectory(containerName),
(path.replace("/", "-") + ".tar.gz")
.replaceAll("^-", ""));
try (InputStream dockerStream = dockerClient.copyArchiveFromContainerCmd(containerId, path).exec();
OutputStream os = new GZIPOutputStream(new FileOutputStream(output))) {
byte[] block = new byte[readBlockSize];
int read = dockerStream.read(block, 0, readBlockSize);
while (read > -1) {
os.write(block, 0, read);
read = dockerStream.read(block, 0, readBlockSize);
}
} catch (RuntimeException | IOException e) {
LOG.error("Error reading dir from container {}", containerName, e);
}
}
public static void dumpContainerLogDirToTarget(DockerClient docker, String containerId, String path) {
final int readBlockSize = 10000;
try (InputStream dockerStream = docker.copyArchiveFromContainerCmd(containerId, path).exec();
TarArchiveInputStream stream = new TarArchiveInputStream(dockerStream)) {
TarArchiveEntry entry = stream.getNextTarEntry();
while (entry != null) {
if (entry.isFile()) {
File output = new File(getTargetDirectory(containerId), entry.getName().replace("/", "-"));
try (FileOutputStream os = new FileOutputStream(output)) {
byte[] block = new byte[readBlockSize];
int read = stream.read(block, 0, readBlockSize);
while (read > -1) {
os.write(block, 0, read);
read = stream.read(block, 0, readBlockSize);
}
}
}
entry = stream.getNextTarEntry();
}
} catch (RuntimeException | IOException e) {
LOG.error("Error reading bk logs from container {}", containerId, e);
}
}
public static String getContainerIP(DockerClient docker, String containerId) {
for (Map.Entry<String, ContainerNetwork> e : docker.inspectContainerCmd(containerId)
.exec().getNetworkSettings().getNetworks().entrySet()) {
return e.getValue().getIpAddress();
}
throw new IllegalArgumentException("Container " + containerId + " has no networks");
}
public static String runCommand(DockerClient docker, String containerId, String... cmd) throws Exception {
return runCommand(docker, containerId, false, cmd);
}
public static String runCommand(DockerClient docker, String containerId, boolean ignoreError, String... cmd)
throws Exception {
CompletableFuture<Boolean> future = new CompletableFuture<>();
String execid = docker.execCreateCmd(containerId)
.withCmd(cmd)
.withAttachStderr(true)
.withAttachStdout(true)
.exec()
.getId();
String cmdString = Arrays.stream(cmd).collect(Collectors.joining(" "));
StringBuffer output = new StringBuffer();
docker.execStartCmd(execid).withDetach(false).exec(new ResultCallback<Frame>() {
@Override
public void close() {}
@Override
public void onStart(Closeable closeable) {
LOG.info("DOCKER.exec({}:{}): Executing...", containerId, cmdString);
}
@Override
public void onNext(Frame object) {
LOG.info("DOCKER.exec({}:{}): {}", containerId, cmdString, object);
output.append(new String(object.getPayload(), UTF_8));
}
@Override
public void onError(Throwable throwable) {
future.completeExceptionally(throwable);
}
@Override
public void onComplete() {
LOG.info("DOCKER.exec({}:{}): Done", containerId, cmdString);
future.complete(true);
}
});
future.get();
InspectExecResponse resp = docker.inspectExecCmd(execid).exec();
while (resp.isRunning()) {
Thread.sleep(200);
resp = docker.inspectExecCmd(execid).exec();
}
long retCode = resp.getExitCodeLong();
if (retCode != 0) {
LOG.error("DOCKER.exec({}:{}): failed with {} : {}", containerId, cmdString, retCode, output);
if (!ignoreError) {
throw new Exception(String.format("cmd(%s) failed on %s with exitcode %d",
cmdString, containerId, retCode));
}
} else {
LOG.info("DOCKER.exec({}:{}): completed with {}", containerId, cmdString, retCode);
}
return output.toString();
}
public static Set<String> cubeIdsMatching(String needle) {
Pattern pattern = Pattern.compile("^arq.cube.docker.([^.]*).ip$");
return System.getProperties().keySet().stream()
.map(k -> pattern.matcher(k.toString()))
.filter(m -> m.matches())
.map(m -> m.group(1))
.filter(m -> m.contains(needle))
.collect(Collectors.toSet());
}
}