blob: eda776e789a031aaaa73e38e1ea47c60233ccd10 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.integ;
import java.util.concurrent.TimeoutException;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.collection.Pair;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.command.DockerCmdExecFactory;
import com.github.dockerjava.api.command.ExecCreateCmd;
import com.github.dockerjava.api.command.ExecCreateCmdResponse;
import com.github.dockerjava.api.model.Container;
import com.github.dockerjava.core.DefaultDockerClientConfig;
import com.github.dockerjava.core.DockerClientBuilder;
import com.github.dockerjava.core.DockerClientConfig;
import com.github.dockerjava.core.command.ExecStartResultCallback;
import com.github.dockerjava.jaxrs.JerseyDockerCmdExecFactory;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.BeforeEach;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
/**
* Base test class for IT Test helps to run command and generate data.
*/
public abstract class ITTestBase {
public static final Logger LOG = LogManager.getLogger(ITTestBase.class);
protected static final String SPARK_WORKER_CONTAINER = "/spark-worker-1";
protected static final String ADHOC_1_CONTAINER = "/adhoc-1";
protected static final String ADHOC_2_CONTAINER = "/adhoc-2";
protected static final String HIVESERVER = "/hiveserver";
protected static final String PRESTO_COORDINATOR = "/presto-coordinator-1";
protected static final String HOODIE_WS_ROOT = "/var/hoodie/ws";
protected static final String HOODIE_JAVA_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_app.sh";
protected static final String HOODIE_GENERATE_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_generate_app.sh";
protected static final String HOODIE_JAVA_STREAMING_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_streaming_app.sh";
protected static final String HUDI_HADOOP_BUNDLE =
HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-hadoop-mr-bundle.jar";
protected static final String HUDI_HIVE_SYNC_BUNDLE =
HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-hive-sync-bundle.jar";
protected static final String HUDI_SPARK_BUNDLE =
HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-spark-bundle.jar";
protected static final String HUDI_UTILITIES_BUNDLE =
HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-utilities.jar";
protected static final String HIVE_SERVER_JDBC_URL = "jdbc:hive2://hiveserver:10000";
protected static final String PRESTO_COORDINATOR_URL = "presto-coordinator-1:8090";
protected static final String HADOOP_CONF_DIR = "/etc/hadoop";
// Skip these lines when capturing output from hive
private static final String DEFAULT_DOCKER_HOST = "unix:///var/run/docker.sock";
private static final String OVERRIDDEN_DOCKER_HOST = System.getenv("DOCKER_HOST");
protected DockerClient dockerClient;
protected Map<String, Container> runningContainers;
static String[] getHiveConsoleCommand(String rawCommand) {
String jarCommand = "add jar " + HUDI_HADOOP_BUNDLE + ";";
String fullCommand = jarCommand + rawCommand;
List<String> cmd = new ArrayList<>();
cmd.add("hive");
cmd.add("--hiveconf");
cmd.add("hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat");
cmd.add("--hiveconf");
cmd.add("hive.stats.autogather=false");
cmd.add("-e");
cmd.add("\"" + fullCommand + "\"");
return cmd.toArray(new String[0]);
}
private static String getHiveConsoleCommandFile(String commandFile, String additionalVar) {
StringBuilder builder = new StringBuilder().append("beeline -u " + HIVE_SERVER_JDBC_URL)
.append(" --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat ")
.append(" --hiveconf hive.stats.autogather=false ")
.append(" --hivevar hudi.hadoop.bundle=" + HUDI_HADOOP_BUNDLE);
if (additionalVar != null) {
builder.append(" --hivevar " + additionalVar + " ");
}
return builder.append(" -f ").append(commandFile).toString();
}
static String getSparkShellCommand(String commandFile) {
return new StringBuilder().append("spark-shell --jars ").append(HUDI_SPARK_BUNDLE)
.append(" --master local[2] --driver-class-path ").append(HADOOP_CONF_DIR)
.append(
" --conf spark.sql.hive.convertMetastoreParquet=false --deploy-mode client --driver-memory 1G --executor-memory 1G --num-executors 1 ")
.append(" --packages org.apache.spark:spark-avro_2.11:2.4.4 ").append(" -i ").append(commandFile).toString();
}
static String getPrestoConsoleCommand(String commandFile) {
return new StringBuilder().append("presto --server " + PRESTO_COORDINATOR_URL)
.append(" --catalog hive --schema default")
.append(" -f " + commandFile).toString();
}
@BeforeEach
public void init() {
String dockerHost = (OVERRIDDEN_DOCKER_HOST != null) ? OVERRIDDEN_DOCKER_HOST : DEFAULT_DOCKER_HOST;
// Assuming insecure docker engine
DockerClientConfig config =
DefaultDockerClientConfig.createDefaultConfigBuilder().withDockerHost(dockerHost).build();
// using jaxrs/jersey implementation here (netty impl is also available)
DockerCmdExecFactory dockerCmdExecFactory = new JerseyDockerCmdExecFactory().withConnectTimeout(1000)
.withMaxTotalConnections(100).withMaxPerRouteConnections(10);
dockerClient = DockerClientBuilder.getInstance(config).withDockerCmdExecFactory(dockerCmdExecFactory).build();
await().atMost(60, SECONDS).until(this::servicesUp);
}
private boolean servicesUp() {
List<Container> containerList = dockerClient.listContainersCmd().exec();
for (Container c : containerList) {
if (!c.getState().equalsIgnoreCase("running")) {
LOG.info("Container : " + Arrays.toString(c.getNames()) + "not in running state, Curr State :" + c.getState());
return false;
}
}
runningContainers = containerList.stream().map(c -> Pair.of(c.getNames()[0], c))
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
return true;
}
private String singleSpace(String str) {
return str.replaceAll("[\\s]+", " ");
}
private TestExecStartResultCallback executeCommandInDocker(String containerName, String[] command,
boolean expectedToSucceed) throws Exception {
Container sparkWorkerContainer = runningContainers.get(containerName);
ExecCreateCmd cmd = dockerClient.execCreateCmd(sparkWorkerContainer.getId()).withCmd(command).withAttachStdout(true)
.withAttachStderr(true);
ExecCreateCmdResponse createCmdResponse = cmd.exec();
TestExecStartResultCallback callback =
new TestExecStartResultCallback(new ByteArrayOutputStream(), new ByteArrayOutputStream());
// Each execution of command(s) in docker should not be more than 15 mins. Otherwise, it is deemed stuck. We will
// try to capture stdout and stderr of the stuck process.
boolean completed =
dockerClient.execStartCmd(createCmdResponse.getId()).withDetach(false).withTty(false).exec(callback)
.awaitCompletion(540, SECONDS);
if (!completed) {
callback.getStderr().flush();
callback.getStdout().flush();
LOG.error("\n\n ###### Timed Out Command : " + Arrays.asList(command));
LOG.error("\n\n ###### Stderr of timed-out command #######\n" + callback.getStderr().toString());
LOG.error("\n\n ###### stdout of timed-out command #######\n" + callback.getStdout().toString());
throw new TimeoutException("Command " + command + " has been running for more than 9 minutes. "
+ "Killing and failing !!");
}
int exitCode = dockerClient.inspectExecCmd(createCmdResponse.getId()).exec().getExitCode();
LOG.info("Exit code for command : " + exitCode);
if (exitCode != 0) {
LOG.error("\n\n ###### Stdout #######\n" + callback.getStdout().toString());
}
LOG.error("\n\n ###### Stderr #######\n" + callback.getStderr().toString());
if (expectedToSucceed) {
assertEquals(0, exitCode, "Command (" + Arrays.toString(command) + ") expected to succeed. Exit (" + exitCode + ")");
} else {
assertNotEquals(0, exitCode, "Command (" + Arrays.toString(command) + ") expected to fail. Exit (" + exitCode + ")");
}
cmd.close();
return callback;
}
void executeCommandStringsInDocker(String containerName, List<String> commands) throws Exception {
for (String cmd : commands) {
executeCommandStringInDocker(containerName, cmd, true);
}
}
protected TestExecStartResultCallback executeCommandStringInDocker(String containerName, String cmd, boolean expectedToSucceed)
throws Exception {
LOG.info("\n\n#################################################################################################");
LOG.info("Container : " + containerName + ", Running command :" + cmd);
LOG.info("\n#################################################################################################");
String[] cmdSplits = singleSpace(cmd).split(" ");
return executeCommandInDocker(containerName, cmdSplits, expectedToSucceed);
}
protected Pair<String, String> executeHiveCommand(String hiveCommand) throws Exception {
LOG.info("\n\n#################################################################################################");
LOG.info("Running hive command :" + hiveCommand);
LOG.info("\n#################################################################################################");
String[] hiveCmd = getHiveConsoleCommand(hiveCommand);
TestExecStartResultCallback callback = executeCommandInDocker(HIVESERVER, hiveCmd, true);
return Pair.of(callback.getStdout().toString().trim(), callback.getStderr().toString().trim());
}
Pair<String, String> executeHiveCommandFile(String commandFile) throws Exception {
return executeHiveCommandFile(commandFile, null);
}
Pair<String, String> executeHiveCommandFile(String commandFile, String additionalVar) throws Exception {
String hiveCmd = getHiveConsoleCommandFile(commandFile, additionalVar);
TestExecStartResultCallback callback = executeCommandStringInDocker(HIVESERVER, hiveCmd, true);
return Pair.of(callback.getStdout().toString().trim(), callback.getStderr().toString().trim());
}
Pair<String, String> executeSparkSQLCommand(String commandFile, boolean expectedToSucceed) throws Exception {
String sparkShellCmd = getSparkShellCommand(commandFile);
TestExecStartResultCallback callback =
executeCommandStringInDocker(ADHOC_1_CONTAINER, sparkShellCmd, expectedToSucceed);
return Pair.of(callback.getStdout().toString(), callback.getStderr().toString());
}
Pair<String, String> executePrestoCommandFile(String commandFile) throws Exception {
String prestoCmd = getPrestoConsoleCommand(commandFile);
TestExecStartResultCallback callback = executeCommandStringInDocker(PRESTO_COORDINATOR, prestoCmd, true);
return Pair.of(callback.getStdout().toString().trim(), callback.getStderr().toString().trim());
}
void executePrestoCopyCommand(String fromFile, String remotePath) {
Container sparkWorkerContainer = runningContainers.get(PRESTO_COORDINATOR);
dockerClient.copyArchiveToContainerCmd(sparkWorkerContainer.getId())
.withHostResource(fromFile)
.withRemotePath(remotePath)
.exec();
}
private void saveUpLogs() {
try {
// save up the Hive log files for introspection
String hiveLogStr =
executeCommandStringInDocker(HIVESERVER, "cat /tmp/root/hive.log | grep -i exception -A 10 -B 5", true).getStdout().toString();
String filePath = System.getProperty("java.io.tmpdir") + "/" + System.currentTimeMillis() + "-hive.log";
FileIOUtils.writeStringToFile(hiveLogStr, filePath);
LOG.info("Hive log saved up at : " + filePath);
LOG.info("<=========== Full hive log ===============>\n"
+ "\n" + hiveLogStr
+ "\n <==========================================>");
} catch (Exception e) {
LOG.error("Unable to save up logs..", e);
}
}
void assertStdOutContains(Pair<String, String> stdOutErr, String expectedOutput) {
assertStdOutContains(stdOutErr, expectedOutput, 1);
}
void assertStdOutContains(Pair<String, String> stdOutErr, String expectedOutput, int times) {
// this is so that changes in padding don't affect comparison
String stdOutSingleSpaced = singleSpace(stdOutErr.getLeft()).replaceAll(" ", "");
expectedOutput = singleSpace(expectedOutput).replaceAll(" ", "");
int lastIndex = 0;
int count = 0;
while (lastIndex != -1) {
lastIndex = stdOutSingleSpaced.indexOf(expectedOutput, lastIndex);
if (lastIndex != -1) {
count++;
lastIndex += expectedOutput.length();
}
}
if (times != count) {
saveUpLogs();
}
assertEquals(times, count, "Did not find output the expected number of times.");
}
public class TestExecStartResultCallback extends ExecStartResultCallback {
// Storing the reference in subclass to expose to clients
private final ByteArrayOutputStream stdout;
private final ByteArrayOutputStream stderr;
public TestExecStartResultCallback(ByteArrayOutputStream stdout, ByteArrayOutputStream stderr) {
super(stdout, stderr);
this.stdout = stdout;
this.stderr = stderr;
}
@Override
public void onComplete() {
super.onComplete();
LOG.info("onComplete called");
try {
stderr.flush();
stdout.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public ByteArrayOutputStream getStdout() {
return stdout;
}
public ByteArrayOutputStream getStderr() {
return stderr;
}
}
}