blob: ae0e0b2820d089919a47c2a11b2935c78b5c9223 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.testutil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.TimeZone;
import java.util.regex.Pattern;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import javax.json.JsonWriter;
import javax.json.JsonWriterFactory;
import javax.json.stream.JsonGenerator;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.impala.catalog.Catalog;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.common.RuntimeEnv;
import org.apache.impala.thrift.TClientRequest;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TQueryCtx;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TSessionState;
import org.apache.impala.thrift.TSessionType;
import org.apache.impala.thrift.TUniqueId;
import org.junit.Assume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
public class TestUtils {
private final static Logger LOG = LoggerFactory.getLogger(TestUtils.class);
private final static String[] ignoreContentAfter_ = { "HOST:", "LOCATIONS:" };
// Special prefix that designates an expected value specified as a regex rather
// than a literal
private final static String REGEX_AGAINST_ACTUAL = "regex:";
// Regexes that match various elements in plan.
private final static String NUMBER_REGEX = "\\d+(\\.\\d+)?";
private final static String BYTE_SUFFIX_REGEX = "[KMGT]?B";
private final static String BYTE_VALUE_REGEX = NUMBER_REGEX + BYTE_SUFFIX_REGEX;
// Note: The older Hive Server JDBC driver (Hive .9 and earlier) is named similarly:
// "org.apache.hadoop.hive.jdbc.HiveDriver". However, Impala currently only supports
// the Hive Server 2 JDBC driver (Hive .10 and later).
final static String HIVE_SERVER2_DRIVER_NAME =
"org.apache.hive.jdbc.HiveDriver";
// The default connection string template to connect to localhost on a given port
// number.
final static String HS2_CONNECTION_TEMPLATE = "jdbc:hive2://localhost:%d/default";
public interface ResultFilter {
public boolean matches(String input);
public String transform(String input);
}
// Our partition file paths are returned in the format of:
// hdfs://<host>:<port>/<table>/year=2009/month=4/-47469796--667104359_25784_data.0
// Everything after the month=4 can vary run to run so we want to filter this out
// when comparing expected vs actual results. We also want to filter out the
// host/port because that could vary run to run as well.
static class PathFilter implements ResultFilter {
private final static String PATH_FILTER = "-*\\d+--\\d+_\\d+.*$";
private final static String PORT_FILTER = "//\\w+(\\.\\w+)?(\\.\\w+)?:\\d+";
private final String filterKey_;
public PathFilter(String prefix) { filterKey_ = prefix; }
@Override
public boolean matches(String input) { return input.contains(filterKey_); }
@Override
public String transform(String input) {
String result = input.replaceFirst(filterKey_, "");
result = result.replaceAll(PATH_FILTER, " ");
return result.replaceAll(PORT_FILTER, "");
}
}
/**
* Filter to ignore the value from elements in the format key=value.
*/
public static class IgnoreValueFilter implements ResultFilter {
// Literal string containing the key name.
private final String keyPrefix;
private final String valueRegex;
/**
* Create a filter that ignores the value from key value pairs where the key is
* the literal 'key' value and the value matches 'valueRegex'.
*/
public IgnoreValueFilter(String key, String valueRegex) {
// Include leading space to avoid matching partial keys, e.g. if key is "bar" we
// don't want to match "foobar=".
this.keyPrefix = " " + key + "=";
this.valueRegex = valueRegex;
}
@Override
public boolean matches(String input) { return input.contains(keyPrefix); }
@Override
public String transform(String input) {
return input.replaceAll(keyPrefix + valueRegex, keyPrefix);
}
}
/**
* Filter to ignore the filesystem schemes in the scan node explain output. See
* {@link org.apache.impala.planner.PlannerTestBase.PlannerTestOption#VALIDATE_SCAN_FS}
* for more details.
*/
public static final ResultFilter SCAN_NODE_SCHEME_FILTER = new ResultFilter() {
private final String fsSchemes = "(HDFS|S3|LOCAL|ADLS)";
private final Pattern scanNodeFsScheme = Pattern.compile("SCAN " + fsSchemes);
// We don't match the size because the FILE_SIZE_FILTER could remove it
private final Pattern scanNodeInputMetadata =
Pattern.compile(fsSchemes + " partitions=\\d+/\\d+ files=\\d+ size=");
@Override
public boolean matches(String input) {
return scanNodeInputMetadata.matcher(input).find()
|| scanNodeFsScheme.matcher(input).find();
}
@Override
public String transform(String input) {
return input.replaceAll(fsSchemes, "");
}
};
// File size could vary from run to run. For example, the parquet file header size
// or column metadata size could change if the Impala version changes. That doesn't
// mean anything is wrong with the plan, so we want to filter the file size out.
public static final IgnoreValueFilter FILE_SIZE_FILTER =
new IgnoreValueFilter("size", BYTE_VALUE_REGEX);
// Ignore the row-size=8B entries
public static final IgnoreValueFilter ROW_SIZE_FILTER =
new IgnoreValueFilter("row-size", "\\S+");
// Ignore cardinality=27.30K or cardinality=unavailable
// entries
public static final IgnoreValueFilter CARDINALITY_FILTER =
new IgnoreValueFilter("cardinality", "\\S+");
// Ignore the exact estimated row count, which depends on the file sizes.
static IgnoreValueFilter SCAN_RANGE_ROW_COUNT_FILTER =
new IgnoreValueFilter("max-scan-range-rows", PrintUtils.METRIC_REGEX);
// Filters that are always applied
private static final List<ResultFilter> DEFAULT_FILTERS = Arrays.<ResultFilter>asList(
SCAN_RANGE_ROW_COUNT_FILTER, new PathFilter("hdfs:"), new PathFilter("file: "));
// Filters that ignore the values of resource requirements that appear in
// "EXTENDED" and above explain plans.
public static final List<ResultFilter> RESOURCE_FILTERS = Arrays.<ResultFilter>asList(
new IgnoreValueFilter("mem-estimate", BYTE_VALUE_REGEX),
new IgnoreValueFilter("mem-reservation", BYTE_VALUE_REGEX),
new IgnoreValueFilter("thread-reservation", NUMBER_REGEX),
new IgnoreValueFilter("Memory", BYTE_VALUE_REGEX),
new IgnoreValueFilter("Threads", NUMBER_REGEX));
/**
* Do a line-by-line comparison of actual and expected output.
* Comparison of the individual lines ignores whitespace.
* If an expected line starts with expectedFilePrefix,
* then the expected vs. actual comparison is successful if the actual string contains
* the expected line (ignoring the expectedFilePrefix prefix).
* If orderMatters is false, we consider actual to match expected if they
* both contains the same output lines regardless of order.
* lineFilters is a list of filters that are applied to corresponding lines in the
* actual and expected output if the filter matches the expected output.
*
* @return an error message if actual does not match expected, "" otherwise.
*/
public static String compareOutput(ArrayList<String> actual, ArrayList<String> expected,
boolean orderMatters, List<ResultFilter> lineFilters) {
if (!orderMatters) {
Collections.sort(actual);
Collections.sort(expected);
}
int mismatch = -1; // line in actual w/ mismatch
int maxLen = Math.min(actual.size(), expected.size());
outer:
for (int i = 0; i < maxLen; ++i) {
String expectedStr = expected.get(i);
String actualStr = actual.get(i);
// Apply all default and caller-supplied filters to the expected and actual output.
boolean containsPrefix = false;
for (List<ResultFilter> filters:
Arrays.<List<ResultFilter>>asList(DEFAULT_FILTERS, lineFilters)) {
for (ResultFilter filter: filters) {
if (filter.matches(expectedStr) || filter.matches(actualStr)) {
containsPrefix = true;
expectedStr = filter.transform(expectedStr);
actualStr = filter.transform(actualStr);
}
}
}
boolean ignoreAfter = false;
for (int j = 0; j < ignoreContentAfter_.length; ++j) {
ignoreAfter |= expectedStr.startsWith(ignoreContentAfter_[j]);
}
if (expectedStr.startsWith(REGEX_AGAINST_ACTUAL)) {
// Get regex to check against by removing prefix.
String regex = expectedStr.replace(REGEX_AGAINST_ACTUAL, "").trim();
if (!actualStr.matches(regex)) {
mismatch = i;
break;
}
// Accept actualStr.
continue;
}
// do a whitespace-insensitive comparison
try (Scanner e = new Scanner(expectedStr);
Scanner a = new Scanner(actualStr)) {
while (a.hasNext() && e.hasNext()) {
String aToken = a.next();
String eToken = e.next();
if (containsPrefix) {
if (!aToken.contains(eToken)) {
mismatch = i;
break outer;
}
} else if (!aToken.equals(eToken)) {
mismatch = i;
break outer;
}
}
if (ignoreAfter) {
if (e.hasNext() && !a.hasNext()) {
mismatch = i;
break outer;
}
} else if (a.hasNext() != e.hasNext()) {
mismatch = i;
break outer;
}
}
}
if (mismatch == -1 && actual.size() < expected.size()) {
// actual is a prefix of expected
StringBuilder output =
new StringBuilder("Actual result is missing lines:\n");
for (int i = 0; i < actual.size(); ++i) {
output.append(actual.get(i)).append("\n");
}
output.append("Missing:\n");
for (int i = actual.size(); i < expected.size(); ++i) {
output.append(expected.get(i)).append("\n");
}
return output.toString();
}
if (mismatch != -1) {
// print actual and expected, highlighting mismatch
StringBuilder output =
new StringBuilder("Actual does not match expected result:\n");
for (int i = 0; i <= mismatch; ++i) {
output.append(actual.get(i)).append("\n");
}
// underline mismatched line with "^^^..."
for (int i = 0; i < actual.get(mismatch).length(); ++i) {
output.append('^');
}
output.append("\n");
for (int i = mismatch + 1; i < actual.size(); ++i) {
output.append(actual.get(i)).append("\n");
}
output.append("\nExpected:\n");
for (String str : expected) {
output.append(str).append("\n");
}
return output.toString();
}
if (actual.size() > expected.size()) {
// print actual and expected
StringBuilder output =
new StringBuilder("Actual result contains extra output:\n");
for (String str : actual) {
output.append(str).append("\n");
}
output.append("\nExpected:\n");
for (String str : expected) {
output.append(str).append("\n");
}
return output.toString();
}
return "";
}
/**
* Create a TQueryCtx for executing FE tests.
*/
public static TQueryCtx createQueryContext() {
return createQueryContext(Catalog.DEFAULT_DB, System.getProperty("user.name"));
}
public static TQueryCtx createQueryContext(String defaultDb, String user) {
TQueryCtx queryCtx = createQueryContext(defaultDb, user, new TQueryOptions());
// Disable rewrites by default because some analyzer tests have non-executable
// constant exprs (e.g. dummy UDFs) that do not work with constant folding.
queryCtx.client_request.query_options.setEnable_expr_rewrites(false);
return queryCtx;
}
public static TQueryCtx createQueryContext(TQueryOptions options) {
return createQueryContext(Catalog.DEFAULT_DB,
System.getProperty("user.name"), options);
}
/**
* Create a TQueryCtx for executing FE tests using the given default DB and user.
* Expr rewrites are disabled by default and should be enabled by the caller
* if so desired.
*/
public static TQueryCtx createQueryContext(String defaultDb,
String user, TQueryOptions options) {
TQueryCtx queryCtx = new TQueryCtx();
queryCtx.setClient_request(new TClientRequest("FeTests", options));
queryCtx.setQuery_id(new TUniqueId());
queryCtx.setSession(new TSessionState(new TUniqueId(), TSessionType.BEESWAX,
defaultDb, user, new TNetworkAddress("localhost", 0)));
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSSSSS");
Date now = Calendar.getInstance().getTime();
queryCtx.setNow_string(formatter.format(now));
formatter.setTimeZone(TimeZone.getTimeZone("GMT"));
queryCtx.setUtc_timestamp_string(formatter.format(now));
queryCtx.setLocal_time_zone("UTC");
queryCtx.setStart_unix_millis(System.currentTimeMillis());
queryCtx.setPid(1000);
return queryCtx;
}
/**
* Pretty print a JSON string.
*/
public static String prettyPrintJson(String json) {
StringWriter sw = new StringWriter();
JsonWriter jsonWriter = null;
try {
JsonReader jr = Json.createReader(new StringReader(json));
JsonObject jobj = jr.readObject();
Map<String, Object> properties = Maps.newHashMap();
properties.put(JsonGenerator.PRETTY_PRINTING, true);
JsonWriterFactory writerFactory = Json.createWriterFactory(properties);
jsonWriter = writerFactory.createWriter(sw);
jsonWriter.writeObject(jobj);
} catch (Exception e) {
LOG.error(String.format("Error pretty printing JSON string %s: %s", json,
e.getMessage()));
return "";
} finally {
if (jsonWriter != null) jsonWriter.close();
}
return sw.toString();
}
public static void assumeKuduIsSupported() {
Assume.assumeTrue(RuntimeEnv.INSTANCE.isKuduSupported());
}
/**
* Returns the hive major version from environment
*/
public static int getHiveMajorVersion() {
String hiveMajorVersion = Preconditions.checkNotNull(System.getenv(
"IMPALA_HIVE_MAJOR_VERSION"));
return Integer.parseInt(hiveMajorVersion);
}
/**
* Gets checks if the catalog server running on the given host and port has
* catalog-v2 enabled
* @return
* @throws IOException
*/
public static boolean isCatalogV2Enabled(String host, int port) throws IOException {
Preconditions.checkNotNull(host);
Preconditions.checkState(port >= 0);
String topicMode = getConfigValue(new URL(String.format("http://%s:%s"
+ "/varz?json", host, port)), "catalog_topic_mode");
Preconditions.checkNotNull(topicMode);
return topicMode.equals("minimal");
}
/**
* Gets a flag value from the given URL. Useful to scrubbing the catalog/coordinator
* varz json output to look for interesting configurations
*/
private static String getConfigValue(URL url, String key) throws IOException {
Map<Object, Object> map = new ObjectMapper().readValue(url, Map.class);
if (map.containsKey("flags")) {
Preconditions.checkState(map.containsKey("flags"));
ArrayList<LinkedHashMap<String, String>> flags =
(ArrayList<LinkedHashMap<String, String>>) map.get("flags");
for (LinkedHashMap<String, String> flag : flags) {
if (flag.getOrDefault("name", "").equals(key)) return flag.get("current");
}
}
return null;
}
/**
* Returns a random alphanumeric string of given length
*/
public static String getRandomString(int size) {
return RandomStringUtils.randomAlphanumeric(size);
}
}