blob: 6f646ac3751625642d8d2738bfd8f0977391b8a9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.hive;
import junit.framework.Assert;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hive.cli.CliDriver;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.common.io.CachingPrintStream;
import org.apache.hadoop.hive.common.io.DigestPrintStream;
import org.apache.hadoop.hive.common.io.SortAndDigestPrintStream;
import org.apache.hadoop.hive.common.io.SortPrintStream;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ParseDriver;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.util.Shell;
import org.apache.hive.common.util.StreamPrinter;
import org.apache.tools.ant.BuildException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.io.StringWriter;
import java.net.URL;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* HiveTestUtil cloned from Hive QTestUtil. Can be outdated and may require update once a problem
* found.
*/
public class HiveTestUtil {
public static final String UTF_8 = "UTF-8";
private static final Log LOG = LogFactory.getLog("HiveTestUtil");
private static final String QTEST_LEAVE_FILES = "QTEST_LEAVE_FILES";
public static final String DEFAULT_DATABASE_NAME = "default";
private String testWarehouse;
private final String testFiles;
protected final String outDir;
protected final String logDir;
private final TreeMap<String, String> qMap;
private final Set<String> qSkipSet;
private final Set<String> qSortSet;
private final Set<String> qSortQuerySet;
private final Set<String> qHashQuerySet;
private final Set<String> qSortNHashQuerySet;
private final Set<String> qJavaVersionSpecificOutput;
private static final String SORT_SUFFIX = ".sorted";
private static MiniClusterType clusterType = MiniClusterType.none;
private ParseDriver pd;
protected Hive db;
protected HiveConf conf;
private BaseSemanticAnalyzer sem;
protected final boolean overWrite;
private CliDriver cliDriver;
private HadoopShims.MiniMrShim mr = null;
private HadoopShims.MiniDFSShim dfs = null;
private String hadoopVer = null;
private HiveTestSetup setup = null;
private boolean isSessionStateStarted = false;
private static final String javaVersion = getJavaVersion();
private String initScript = "";
private String cleanupScript = "";
public HiveConf getConf() {
return conf;
}
public boolean deleteDirectory(File path) {
if (path.exists()) {
File[] files = path.listFiles();
for (File file : files) {
if (file.isDirectory()) {
deleteDirectory(file);
} else {
file.delete();
}
}
}
return (path.delete());
}
public void copyDirectoryToLocal(Path src, Path dest) throws Exception {
FileSystem srcFs = src.getFileSystem(conf);
FileSystem destFs = dest.getFileSystem(conf);
if (srcFs.exists(src)) {
FileStatus[] files = srcFs.listStatus(src);
for (FileStatus file : files) {
String name = file.getPath().getName();
Path dfs_path = file.getPath();
Path local_path = new Path(dest, name);
if (file.isDir()) {
if (!destFs.exists(local_path)) {
destFs.mkdirs(local_path);
}
copyDirectoryToLocal(dfs_path, local_path);
} else {
srcFs.copyToLocalFile(dfs_path, local_path);
}
}
}
}
static Pattern mapTok = Pattern.compile("(\\.?)(.*)_map_(.*)");
static Pattern reduceTok = Pattern.compile("(.*)(reduce_[^\\.]*)((\\..*)?)");
public void normalizeNames(File path) throws Exception {
if (path.isDirectory()) {
File[] files = path.listFiles();
for (File file : files) {
normalizeNames(file);
}
} else {
Matcher m = reduceTok.matcher(path.getName());
if (m.matches()) {
String name = m.group(1) + "reduce" + m.group(3);
path.renameTo(new File(path.getParent(), name));
} else {
m = mapTok.matcher(path.getName());
if (m.matches()) {
String name = m.group(1) + "map_" + m.group(3);
path.renameTo(new File(path.getParent(), name));
}
}
}
}
public String getOutputDirectory() {
return outDir;
}
public String getLogDirectory() {
return logDir;
}
private String getHadoopMainVersion(String input) {
if (input == null) {
return null;
}
Pattern p = Pattern.compile("^(\\d+\\.\\d+).*");
Matcher m = p.matcher(input);
if (m.matches()) {
return m.group(1);
}
return null;
}
public void initConf() throws Exception {
// Plug verifying metastore in for testing.
conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL,
"org.apache.hadoop.hive.metastore.VerifyingObjectStore");
if (mr != null) {
assert dfs != null;
mr.setupConfiguration(conf);
// set fs.default.name to the uri of mini-dfs
String dfsUriString = WindowsPathUtil.getHdfsUriString(dfs.getFileSystem().getUri()
.toString());
conf.setVar(HiveConf.ConfVars.HADOOPFS, dfsUriString);
// hive.metastore.warehouse.dir needs to be set relative to the mini-dfs
conf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE,
(new Path(dfsUriString,
"/build/ql/test/data/warehouse/")).toString());
}
// Windows paths should be converted after MiniMrShim.setupConfiguration()
// since setupConfiguration may overwrite configuration values.
if (Shell.WINDOWS) {
WindowsPathUtil.convertPathsFromWindowsToHdfs(conf);
}
}
public enum MiniClusterType {
mr,
tez,
none;
public static MiniClusterType valueForString(String type) {
if (type.equals("miniMR")) {
return mr;
} else if (type.equals("tez")) {
return tez;
} else {
return none;
}
}
}
public HiveTestUtil(String outDir, String logDir, MiniClusterType clusterType, String hadoopVer)
throws Exception {
this(outDir, logDir, clusterType, null, hadoopVer);
}
public HiveTestUtil(String outDir, String logDir, MiniClusterType clusterType, String confDir,
String hadoopVer)
throws Exception {
this.outDir = outDir;
this.logDir = logDir;
if (confDir != null && !confDir.isEmpty()) {
HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath()
+ "/hive-site.xml"));
LOG.info("Setting hive-site: " + HiveConf.getHiveSiteLocation());
}
conf = new HiveConf();
String tmpBaseDir = System.getProperty("test.tmp.dir");
if (tmpBaseDir == null || tmpBaseDir == "") {
tmpBaseDir = System.getProperty("java.io.tmpdir");
}
String metaStoreURL = "jdbc:derby:" + tmpBaseDir + File.separator + "metastore_dbtest;" +
"create=true";
conf.set(ConfVars.METASTORECONNECTURLKEY.varname, metaStoreURL);
System.setProperty(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, metaStoreURL);
//set where derby logs
File derbyLogFile = new File(tmpBaseDir + "/derby.log");
derbyLogFile.createNewFile();
System.setProperty("derby.stream.error.file", derbyLogFile.getPath());
this.hadoopVer = getHadoopMainVersion(hadoopVer);
qMap = new TreeMap<String, String>();
qSkipSet = new HashSet<String>();
qSortSet = new HashSet<String>();
qSortQuerySet = new HashSet<String>();
qHashQuerySet = new HashSet<String>();
qSortNHashQuerySet = new HashSet<String>();
qJavaVersionSpecificOutput = new HashSet<String>();
this.clusterType = clusterType;
// Using randomUUID for dfs cluster
System.setProperty("test.build.data", "target/test-data/hive-" + UUID.randomUUID().toString
());
HadoopShims shims = ShimLoader.getHadoopShims();
int numberOfDataNodes = 1;
if (clusterType != MiniClusterType.none) {
dfs = shims.getMiniDfs(conf, numberOfDataNodes, true, null);
FileSystem fs = dfs.getFileSystem();
String uriString = WindowsPathUtil.getHdfsUriString(fs.getUri().toString());
if (clusterType == MiniClusterType.tez) {
conf.set("hive.execution.engine", "tez");
mr = shims.getMiniTezCluster(conf, 1, uriString, 1);
} else {
conf.set("hive.execution.engine", "mr");
mr = shims.getMiniMrCluster(conf, 1, uriString, 1);
}
}
initConf();
// Use the current directory if it is not specified
String dataDir = conf.get("test.data.files");
if (dataDir == null) {
dataDir = new File(".").getAbsolutePath() + "/data/files";
}
testFiles = dataDir;
// Use the current directory if it is not specified
String scriptsDir = conf.get("test.data.scripts");
if (scriptsDir == null) {
scriptsDir = new File(".").getAbsolutePath() + "/data/scripts";
}
if (!initScript.isEmpty()) {
this.initScript = scriptsDir + "/" + initScript;
}
if (!cleanupScript.isEmpty()) {
this.cleanupScript = scriptsDir + "/" + cleanupScript;
}
overWrite = "true".equalsIgnoreCase(System.getProperty("test.output.overwrite"));
setup = new HiveTestSetup();
setup.preTest(conf);
init();
}
public void shutdown() throws Exception {
cleanUp();
setup.tearDown();
if (mr != null) {
mr.shutdown();
mr = null;
}
FileSystem.closeAll();
if (dfs != null) {
dfs.shutdown();
dfs = null;
}
}
public String readEntireFileIntoString(File queryFile) throws IOException {
InputStreamReader isr = new InputStreamReader(
new BufferedInputStream(new FileInputStream(queryFile)), HiveTestUtil.UTF_8);
StringWriter sw = new StringWriter();
try {
IOUtils.copy(isr, sw);
} finally {
if (isr != null) {
isr.close();
}
}
return sw.toString();
}
public void addFile(String queryFile) throws IOException {
addFile(queryFile, false);
}
public void addFile(String queryFile, boolean partial) throws IOException {
addFile(new File(queryFile));
}
public void addFile(File qf) throws IOException {
addFile(qf, false);
}
public void addFile(File qf, boolean partial) throws IOException {
String query = readEntireFileIntoString(qf);
qMap.put(qf.getName(), query);
if (partial) return;
if (matches(SORT_BEFORE_DIFF, query)) {
qSortSet.add(qf.getName());
} else if (matches(SORT_QUERY_RESULTS, query)) {
qSortQuerySet.add(qf.getName());
} else if (matches(HASH_QUERY_RESULTS, query)) {
qHashQuerySet.add(qf.getName());
} else if (matches(SORT_AND_HASH_QUERY_RESULTS, query)) {
qSortNHashQuerySet.add(qf.getName());
}
}
private static final Pattern SORT_BEFORE_DIFF = Pattern.compile("-- SORT_BEFORE_DIFF");
private static final Pattern SORT_QUERY_RESULTS = Pattern.compile("-- SORT_QUERY_RESULTS");
private static final Pattern HASH_QUERY_RESULTS = Pattern.compile("-- HASH_QUERY_RESULTS");
private static final Pattern SORT_AND_HASH_QUERY_RESULTS = Pattern.compile("-- " +
"SORT_AND_HASH_QUERY_RESULTS");
private boolean matches(Pattern pattern, String query) {
Matcher matcher = pattern.matcher(query);
if (matcher.find()) {
return true;
}
return false;
}
/**
* Get formatted Java version to include minor version, but
* exclude patch level.
*
* @return Java version formatted as major_version.minor_version
*/
private static String getJavaVersion() {
String version = System.getProperty("java.version");
if (version == null) {
throw new NullPointerException("No java version could be determined " +
"from system properties");
}
// "java version" system property is formatted
// major_version.minor_version.patch_level.
// Find second dot, instead of last dot, to be safe
int pos = version.indexOf('.');
pos = version.indexOf('.', pos + 1);
return version.substring(0, pos);
}
/**
* Clear out any side effects of running tests
*/
public void clearPostTestEffects() throws Exception {
setup.postTest(conf);
}
/**
* Clear out any side effects of running tests
*/
public void clearTablesCreatedDuringTests() throws Exception {
if (System.getenv(QTEST_LEAVE_FILES) != null) {
return;
}
// Delete any tables other than the source tables
// and any databases other than the default database.
for (String dbName : db.getAllDatabases()) {
SessionState.get().setCurrentDatabase(dbName);
for (String tblName : db.getAllTables()) {
if (!DEFAULT_DATABASE_NAME.equals(dbName)) {
Table tblObj = db.getTable(tblName);
// dropping index table can not be dropped directly. Dropping the base
// table will automatically drop all its index table
if (tblObj.isIndexTable()) {
continue;
}
db.dropTable(dbName, tblName);
} else {
// this table is defined in srcTables, drop all indexes on it
List<Index> indexes = db.getIndexes(dbName, tblName, (short) -1);
if (indexes != null && indexes.size() > 0) {
for (Index index : indexes) {
db.dropIndex(dbName, tblName, index.getIndexName(), true, true);
}
}
}
}
if (!DEFAULT_DATABASE_NAME.equals(dbName)) {
// Drop cascade, may need to drop functions
db.dropDatabase(dbName, true, true, true);
}
}
// delete remaining directories for external tables (can affect stats for following tests)
try {
Path p = new Path(testWarehouse);
FileSystem fileSystem = p.getFileSystem(conf);
if (fileSystem.exists(p)) {
for (FileStatus status : fileSystem.listStatus(p)) {
if (status.isDir()) {
fileSystem.delete(status.getPath(), true);
}
}
}
} catch (IllegalArgumentException e) {
// ignore.. provides invalid url sometimes intentionally
}
SessionState.get().setCurrentDatabase(DEFAULT_DATABASE_NAME);
List<String> roleNames = db.getAllRoleNames();
for (String roleName : roleNames) {
if (!"PUBLIC".equalsIgnoreCase(roleName) && !"ADMIN".equalsIgnoreCase(roleName)) {
db.dropRole(roleName);
}
}
}
/**
* Clear out any side effects of running tests
*/
public void clearTestSideEffects() throws Exception {
if (System.getenv(QTEST_LEAVE_FILES) != null) {
return;
}
clearTablesCreatedDuringTests();
}
public void cleanUp() throws Exception {
if (!isSessionStateStarted) {
startSessionState();
}
if (System.getenv(QTEST_LEAVE_FILES) != null) {
return;
}
clearTablesCreatedDuringTests();
SessionState.get().getConf().setBoolean("hive.test.shutdown.phase", true);
if (cleanupScript != "") {
String cleanupCommands = readEntireFileIntoString(new File(cleanupScript));
LOG.info("Cleanup (" + cleanupScript + "):\n" + cleanupCommands);
if (cliDriver == null) {
cliDriver = new CliDriver();
}
cliDriver.processLine(cleanupCommands);
}
SessionState.get().getConf().setBoolean("hive.test.shutdown.phase", false);
// delete any contents in the warehouse dir
Path p = new Path(testWarehouse);
FileSystem fs = p.getFileSystem(conf);
try {
FileStatus[] ls = fs.listStatus(p);
for (int i = 0; (ls != null) && (i < ls.length); i++) {
fs.delete(ls[i].getPath(), true);
}
} catch (FileNotFoundException e) {
// Best effort
}
FunctionRegistry.unregisterTemporaryUDF("test_udaf");
FunctionRegistry.unregisterTemporaryUDF("test_error");
}
public void createSources() throws Exception {
if (!isSessionStateStarted) {
startSessionState();
}
conf.setBoolean("hive.test.init.phase", true);
if (cliDriver == null) {
cliDriver = new CliDriver();
}
cliDriver.processLine("set test.data.dir=" + testFiles + ";");
conf.setBoolean("hive.test.init.phase", false);
}
public void init() throws Exception {
testWarehouse = conf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE);
conf.setBoolVar(HiveConf.ConfVars.SUBMITLOCALTASKVIACHILD, false);
String execEngine = conf.get("hive.execution.engine");
conf.set("hive.execution.engine", "mr");
SessionState.start(conf);
conf.set("hive.execution.engine", execEngine);
db = Hive.get(conf);
pd = new ParseDriver();
sem = new SemanticAnalyzer(conf);
}
public void init(String tname) throws Exception {
cleanUp();
createSources();
cliDriver.processCmd("set hive.cli.print.header=true;");
}
public void cliInit(String tname) throws Exception {
cliInit(tname, true);
}
public String cliInit(String tname, boolean recreate) throws Exception {
if (recreate) {
cleanUp();
createSources();
}
HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER,
"org.apache.hadoop.hive.ql.security.HadoopDefaultAuthenticator");
Utilities.clearWorkMap();
CliSessionState ss = new CliSessionState(conf);
assert ss != null;
ss.in = System.in;
String outFileExtension = getOutFileExtension(tname);
String stdoutName = null;
if (outDir != null) {
File qf = new File(outDir, tname);
stdoutName = qf.getName().concat(outFileExtension);
} else {
stdoutName = tname + outFileExtension;
}
File outf = new File(logDir, stdoutName);
OutputStream fo = new BufferedOutputStream(new FileOutputStream(outf));
if (qSortQuerySet.contains(tname)) {
ss.out = new SortPrintStream(fo, "UTF-8");
} else if (qHashQuerySet.contains(tname)) {
ss.out = new DigestPrintStream(fo, "UTF-8");
} else if (qSortNHashQuerySet.contains(tname)) {
ss.out = new SortAndDigestPrintStream(fo, "UTF-8");
} else {
ss.out = new PrintStream(fo, true, "UTF-8");
}
ss.err = new CachingPrintStream(fo, true, "UTF-8");
ss.setIsSilent(true);
SessionState oldSs = SessionState.get();
if (oldSs != null && clusterType == MiniClusterType.tez) {
oldSs.close();
}
if (oldSs != null && oldSs.out != null && oldSs.out != System.out) {
oldSs.out.close();
}
SessionState.start(ss);
cliDriver = new CliDriver();
cliDriver.processInitFiles(ss);
return outf.getAbsolutePath();
}
private CliSessionState startSessionState()
throws IOException {
HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER,
"org.apache.hadoop.hive.ql.security.HadoopDefaultAuthenticator");
String execEngine = conf.get("hive.execution.engine");
conf.set("hive.execution.engine", "mr");
CliSessionState ss = new CliSessionState(conf);
assert ss != null;
ss.in = System.in;
ss.out = System.out;
ss.err = System.out;
SessionState oldSs = SessionState.get();
if (oldSs != null && clusterType == MiniClusterType.tez) {
oldSs.close();
}
if (oldSs != null && oldSs.out != null && oldSs.out != System.out) {
oldSs.out.close();
}
SessionState.start(ss);
isSessionStateStarted = true;
conf.set("hive.execution.engine", execEngine);
return ss;
}
public int executeOne(String tname) {
String q = qMap.get(tname);
if (q.indexOf(";") == -1) {
return -1;
}
String q1 = q.substring(0, q.indexOf(";") + 1);
String qrest = q.substring(q.indexOf(";") + 1);
qMap.put(tname, qrest);
LOG.info("Executing " + q1);
return cliDriver.processLine(q1);
}
public static final String CRLF = System.getProperty("line.separator");
public int executeClient(String tname1, String tname2) {
String commands = getCommands(tname1) + CRLF + getCommands(tname2);
return cliDriver.processLine(commands);
}
public int executeClient(String tname) {
conf.set("mapreduce.job.name", "test");
return cliDriver.processLine(getCommands(tname), false);
}
private String getCommands(String tname) {
String commands = qMap.get(tname);
StringBuilder newCommands = new StringBuilder(commands.length());
int lastMatchEnd = 0;
Matcher commentMatcher = Pattern.compile("^--.*$", Pattern.MULTILINE).matcher(commands);
while (commentMatcher.find()) {
newCommands.append(commands.substring(lastMatchEnd, commentMatcher.start()));
newCommands.append(commentMatcher.group().replaceAll("(?<!\\\\);", "\\\\;"));
lastMatchEnd = commentMatcher.end();
}
newCommands.append(commands.substring(lastMatchEnd, commands.length()));
commands = newCommands.toString();
return commands;
}
public boolean shouldBeSkipped(String tname) {
return qSkipSet.contains(tname);
}
private String getOutFileExtension(String fname) {
String outFileExtension = ".out";
if (qJavaVersionSpecificOutput.contains(fname)) {
outFileExtension = ".java" + javaVersion + ".out";
}
return outFileExtension;
}
/**
* Given the current configurations (e.g., hadoop version and execution mode), return
* the correct file name to compare with the current test run output.
*
* @param outDir The directory where the reference log files are stored.
* @param testName The test file name (terminated by ".out").
* @return The file name appended with the configuration values if it exists.
*/
public String outPath(String outDir, String testName) {
String ret = (new File(outDir, testName)).getPath();
// List of configurations. Currently the list consists of hadoop version and execution
// mode only
List<String> configs = new ArrayList<String>();
configs.add(this.hadoopVer);
Deque<String> stack = new LinkedList<String>();
StringBuilder sb = new StringBuilder();
sb.append(testName);
stack.push(sb.toString());
// example file names are input1.q.out_0.20.0_minimr or input2.q.out_0.17
for (String s : configs) {
sb.append('_');
sb.append(s);
stack.push(sb.toString());
}
while (stack.size() > 0) {
String fileName = stack.pop();
File f = new File(outDir, fileName);
if (f.exists()) {
ret = f.getPath();
break;
}
}
return ret;
}
private Pattern[] toPattern(String[] patternStrs) {
Pattern[] patterns = new Pattern[patternStrs.length];
for (int i = 0; i < patternStrs.length; i++) {
patterns[i] = Pattern.compile(patternStrs[i]);
}
return patterns;
}
private void maskPatterns(Pattern[] patterns, String fname) throws Exception {
String maskPattern = "#### A masked pattern was here ####";
String line;
BufferedReader in;
BufferedWriter out;
File file = new File(fname);
File fileOrig = new File(fname + ".orig");
FileUtils.copyFile(file, fileOrig);
in = new BufferedReader(new InputStreamReader(new FileInputStream(fileOrig), "UTF-8"));
out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), "UTF-8"));
boolean lastWasMasked = false;
while (null != (line = in.readLine())) {
for (Pattern pattern : patterns) {
line = pattern.matcher(line).replaceAll(maskPattern);
}
if (line.equals(maskPattern)) {
// We're folding multiple masked lines into one.
if (!lastWasMasked) {
out.write(line);
out.write("\n");
lastWasMasked = true;
}
} else {
out.write(line);
out.write("\n");
lastWasMasked = false;
}
}
in.close();
out.close();
}
private final Pattern[] planMask = toPattern(new String[]{
".*file:.*",
".*pfile:.*",
".*hdfs:.*",
".*/tmp/.*",
".*invalidscheme:.*",
".*lastUpdateTime.*",
".*lastAccessTime.*",
".*lastModifiedTime.*",
".*[Oo]wner.*",
".*CreateTime.*",
".*LastAccessTime.*",
".*Location.*",
".*LOCATION '.*",
".*transient_lastDdlTime.*",
".*last_modified_.*",
".*at org.*",
".*at sun.*",
".*at java.*",
".*at junit.*",
".*Caused by:.*",
".*LOCK_QUERYID:.*",
".*LOCK_TIME:.*",
".*grantTime.*",
".*[.][.][.] [0-9]* more.*",
".*job_[0-9_]*.*",
".*job_local[0-9_]*.*",
".*USING 'java -cp.*",
"^Deleted.*",
".*DagName:.*",
".*Input:.*/data/files/.*",
".*Output:.*/data/files/.*",
".*total number of created files now is.*"
});
public int checkCliDriverResults(String tname) throws Exception {
assert (qMap.containsKey(tname));
String outFileExtension = getOutFileExtension(tname);
String outFileName = outPath(outDir, tname + outFileExtension);
File f = new File(logDir, tname + outFileExtension);
maskPatterns(planMask, f.getPath());
int exitVal = executeDiffCommand(f.getPath(),
outFileName, false,
qSortSet.contains(tname));
if (exitVal != 0 && overWrite) {
exitVal = overwriteResults(f.getPath(), outFileName);
}
return exitVal;
}
public int checkCompareCliDriverResults(String tname, List<String> outputs) throws Exception {
assert outputs.size() > 1;
maskPatterns(planMask, outputs.get(0));
for (int i = 1; i < outputs.size(); ++i) {
maskPatterns(planMask, outputs.get(i));
int ecode = executeDiffCommand(
outputs.get(i - 1), outputs.get(i), false, qSortSet.contains(tname));
if (ecode != 0) {
LOG.info("Files don't match: " + outputs.get(i - 1) + " and " + outputs.get(i));
return ecode;
}
}
return 0;
}
private static int overwriteResults(String inFileName, String outFileName) throws Exception {
// This method can be replaced with Files.copy(source, target, REPLACE_EXISTING)
// once Hive uses JAVA 7.
LOG.info("Overwriting results " + inFileName + " to " + outFileName);
return executeCmd(new String[]{
"cp",
getQuotedString(inFileName),
getQuotedString(outFileName)
});
}
private static int executeDiffCommand(String inFileName,
String outFileName,
boolean ignoreWhiteSpace,
boolean sortResults
) throws Exception {
int result = 0;
if (sortResults) {
// sort will try to open the output file in write mode on windows. We need to
// close it first.
SessionState ss = SessionState.get();
if (ss != null && ss.out != null && ss.out != System.out) {
ss.out.close();
}
String inSorted = inFileName + SORT_SUFFIX;
String outSorted = outFileName + SORT_SUFFIX;
result = sortFiles(inFileName, inSorted);
result |= sortFiles(outFileName, outSorted);
if (result != 0) {
LOG.error("ERROR: Could not sort files before comparing");
return result;
}
inFileName = inSorted;
outFileName = outSorted;
}
ArrayList<String> diffCommandArgs = new ArrayList<String>();
diffCommandArgs.add("diff");
// Text file comparison
diffCommandArgs.add("-a");
// Ignore changes in the amount of white space
if (ignoreWhiteSpace || Shell.WINDOWS) {
diffCommandArgs.add("-b");
}
// Files created on Windows machines have different line endings
// than files created on Unix/Linux. Windows uses carriage return and line feed
// ("\r\n") as a line ending, whereas Unix uses just line feed ("\n").
// Also StringBuilder.toString(), Stream to String conversions adds extra
// spaces at the end of the line.
if (Shell.WINDOWS) {
diffCommandArgs.add("--strip-trailing-cr"); // Strip trailing carriage return on input
diffCommandArgs.add("-B"); // Ignore changes whose lines are all blank
}
// Add files to compare to the arguments list
diffCommandArgs.add(getQuotedString(inFileName));
diffCommandArgs.add(getQuotedString(outFileName));
result = executeCmd(diffCommandArgs);
if (sortResults) {
new File(inFileName).delete();
new File(outFileName).delete();
}
return result;
}
private static int sortFiles(String in, String out) throws Exception {
return executeCmd(new String[]{
"sort",
getQuotedString(in),
}, out, null);
}
private static int executeCmd(Collection<String> args) throws Exception {
return executeCmd(args, null, null);
}
private static int executeCmd(String[] args) throws Exception {
return executeCmd(args, null, null);
}
private static int executeCmd(Collection<String> args, String outFile, String errFile) throws
Exception {
String[] cmdArray = args.toArray(new String[args.size()]);
return executeCmd(cmdArray, outFile, errFile);
}
private static int executeCmd(String[] args, String outFile, String errFile) throws Exception {
LOG.info("Running: " + org.apache.commons.lang.StringUtils.join(args, ' '));
PrintStream out = outFile == null ?
SessionState.getConsole().getChildOutStream() :
new PrintStream(new FileOutputStream(outFile), true);
PrintStream err = errFile == null ?
SessionState.getConsole().getChildErrStream() :
new PrintStream(new FileOutputStream(errFile), true);
Process executor = Runtime.getRuntime().exec(args);
StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, err);
StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, out);
outPrinter.start();
errPrinter.start();
int result = executor.waitFor();
outPrinter.join();
errPrinter.join();
if (outFile != null) {
out.close();
}
if (errFile != null) {
err.close();
}
return result;
}
private static String getQuotedString(String str) {
return Shell.WINDOWS ? String.format("\"%s\"", str) : str;
}
public ASTNode parseQuery(String tname) throws Exception {
return pd.parse(qMap.get(tname));
}
public void resetParser() throws SemanticException {
pd = new ParseDriver();
sem = new SemanticAnalyzer(conf);
}
public TreeMap<String, String> getQMap() {
return qMap;
}
/**
* HiveTestSetup defines test fixtures which are reused across testcases,
* and are needed before any test can be run
*/
public static class HiveTestSetup {
private MiniZooKeeperCluster zooKeeperCluster = null;
private int zkPort;
private ZooKeeper zooKeeper;
public HiveTestSetup() {
}
public void preTest(HiveConf conf) throws Exception {
if (zooKeeperCluster == null) {
//create temp dir
String tmpBaseDir = System.getProperty("test.tmp.dir");
File tmpDir = Utilities.createTempDir(tmpBaseDir);
zooKeeperCluster = new MiniZooKeeperCluster();
zkPort = zooKeeperCluster.startup(tmpDir);
}
if (zooKeeper != null) {
zooKeeper.close();
}
int sessionTimeout = (int) conf.getTimeVar(HiveConf.ConfVars
.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
zooKeeper = new ZooKeeper("localhost:" + zkPort, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent arg0) {
}
});
String zkServer = "localhost";
conf.set("hive.zookeeper.quorum", zkServer);
conf.set("hive.zookeeper.client.port", "" + zkPort);
}
public void postTest(HiveConf conf) throws Exception {
if (zooKeeperCluster == null) {
return;
}
if (zooKeeper != null) {
zooKeeper.close();
}
ZooKeeperHiveLockManager.releaseAllLocks(conf);
}
public void tearDown() throws Exception {
if (zooKeeperCluster != null) {
zooKeeperCluster.shutdown();
zooKeeperCluster = null;
}
}
}
/**
* QTRunner: Runnable class for running a a single query file.
**/
public static class HiveTestRunner implements Runnable {
private final HiveTestUtil qt;
private final String fname;
public HiveTestRunner(HiveTestUtil qt, String fname) {
this.qt = qt;
this.fname = fname;
}
@Override
public void run() {
try {
// assumption is that environment has already been cleaned once globally
// hence each thread does not call cleanUp() and createSources() again
qt.cliInit(fname, false);
qt.executeClient(fname);
} catch (Throwable e) {
LOG.error("Query file " + fname + " failed with exception ", e);
e.printStackTrace();
outputTestFailureHelpMessage();
}
}
}
/**
* Executes a set of query files in sequence.
*
* @param qfiles array of input query files containing arbitrary number of hive
* queries
* @param qt array of HiveTestUtils, one per qfile
* @return true if all queries passed, false otw
*/
public static boolean queryListRunnerSingleThreaded(File[] qfiles, HiveTestUtil[] qt)
throws Exception {
boolean failed = false;
qt[0].cleanUp();
qt[0].createSources();
for (int i = 0; i < qfiles.length && !failed; i++) {
qt[i].clearTestSideEffects();
qt[i].cliInit(qfiles[i].getName(), false);
qt[i].executeClient(qfiles[i].getName());
int ecode = qt[i].checkCliDriverResults(qfiles[i].getName());
if (ecode != 0) {
failed = true;
LOG.error("Test " + qfiles[i].getName()
+ " results check failed with error code " + ecode);
outputTestFailureHelpMessage();
}
qt[i].clearPostTestEffects();
}
return (!failed);
}
public static void outputTestFailureHelpMessage() {
LOG.error("See ./ql/target/tmp/log/hive.log or ./itests/qtest/target/tmp/log/hive.log, "
+ "or check ./ql/target/surefire-reports or " +
"./itests/qtest/target/surefire-reports/ for specific test cases logs.");
}
public static String ensurePathEndsInSlash(String path) {
if (path == null) {
throw new NullPointerException("Path cannot be null");
}
if (path.endsWith(File.separator)) {
return path;
} else {
return path + File.separator;
}
}
private static String[] cachedQvFileList = null;
private static List<String> cachedDefaultQvFileList = null;
private static Pattern qvSuffix = Pattern.compile("_[0-9]+.qv$", Pattern.CASE_INSENSITIVE);
public static List<String> getVersionFiles(String queryDir, String tname) {
ensureQvFileList(queryDir);
List<String> result = getVersionFilesInternal(tname);
if (result == null) {
result = cachedDefaultQvFileList;
}
return result;
}
private static void ensureQvFileList(String queryDir) {
if (cachedQvFileList != null) return;
// Not thread-safe.
LOG.info("Getting versions from " + queryDir);
cachedQvFileList = (new File(queryDir)).list(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.toLowerCase().endsWith(".qv");
}
});
if (cachedQvFileList == null) return; // no files at all
Arrays.sort(cachedQvFileList, String.CASE_INSENSITIVE_ORDER);
List<String> defaults = getVersionFilesInternal("default");
cachedDefaultQvFileList = (defaults != null)
? Collections.unmodifiableList(new ArrayList<String>(defaults)) : Collections.unmodifiableList(new ArrayList<String>());
}
private static List<String> getVersionFilesInternal(String tname) {
if (cachedQvFileList == null) {
return new ArrayList<String>();
}
int pos = Arrays.binarySearch(cachedQvFileList, tname, String.CASE_INSENSITIVE_ORDER);
if (pos >= 0) {
throw new BuildException("Unexpected file list element: " + cachedQvFileList[pos]);
}
List<String> result = null;
for (pos = (-pos - 1); pos < cachedQvFileList.length; ++pos) {
String candidate = cachedQvFileList[pos];
if (candidate.length() <= tname.length()
|| !tname.equalsIgnoreCase(candidate.substring(0, tname.length()))
|| !qvSuffix.matcher(candidate.substring(tname.length())).matches()) {
break;
}
if (result == null) {
result = new ArrayList<String>();
}
result.add(candidate);
}
return result;
}
public void failed(int ecode, String fname, String debugHint) {
String command = SessionState.get() != null ? SessionState.get().getLastCommand() : null;
Assert.fail("Client Execution failed with error code = " + ecode +
(command != null ? " running " + command : "") + (debugHint != null ? debugHint :
""));
}
// for negative tests, which is succeeded.. no need to print the query string
public void failed(String fname, String debugHint) {
Assert.fail("Client Execution was expected to fail, but succeeded with error code 0 " +
(debugHint != null ? debugHint : ""));
}
public void failedDiff(int ecode, String fname, String debugHint) {
Assert.fail("Client Execution results failed with error code = " + ecode +
(debugHint != null ? debugHint : ""));
}
public void failed(Throwable e, String fname, String debugHint) {
String command = SessionState.get() != null ? SessionState.get().getLastCommand() : null;
LOG.error("Exception: ", e);
e.printStackTrace();
LOG.error("Failed query: " + fname);
Assert.fail("Unexpected exception " +
org.apache.hadoop.util.StringUtils.stringifyException(e) + "\n" +
(command != null ? " running " + command : "") +
(debugHint != null ? debugHint : ""));
}
public static class WindowsPathUtil {
public static void convertPathsFromWindowsToHdfs(HiveConf conf) {
// Following local paths are used as HDFS paths in unit tests.
// It works well in Unix as the path notation in Unix and HDFS is more or less same.
// But when it comes to Windows, drive letter separator ':' & backslash '\" are invalid
// characters in HDFS so we need to converts these local paths to HDFS paths before
// using them
// in unit tests.
String orgWarehouseDir = conf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE);
conf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, getHdfsUriString(orgWarehouseDir));
String orgTestTempDir = System.getProperty("test.tmp.dir");
System.setProperty("test.tmp.dir", getHdfsUriString(orgTestTempDir));
String orgTestWarehouseDir = System.getProperty("test.warehouse.dir");
System.setProperty("test.warehouse.dir", getHdfsUriString(orgTestWarehouseDir));
String orgScratchDir = conf.getVar(HiveConf.ConfVars.SCRATCHDIR);
conf.setVar(HiveConf.ConfVars.SCRATCHDIR, getHdfsUriString(orgScratchDir));
}
public static String getHdfsUriString(String uriStr) {
assert uriStr != null;
if (Shell.WINDOWS) {
// If the URI conversion is from Windows to HDFS then replace the '\' with '/'
// and remove the windows single drive letter & colon from absolute path.
return uriStr.replace('\\', '/')
.replaceFirst("/[c-zC-Z]:", "/")
.replaceFirst("^[c-zC-Z]:", "");
}
return uriStr;
}
}
}