| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hive.ql; |
| |
| import java.io.BufferedInputStream; |
| import java.io.BufferedOutputStream; |
| import java.io.BufferedReader; |
| import java.io.BufferedWriter; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileNotFoundException; |
| import java.io.FileOutputStream; |
| import java.io.FileReader; |
| import java.io.FileWriter; |
| import java.io.FilenameFilter; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.io.OutputStream; |
| import java.io.OutputStreamWriter; |
| import java.io.PrintStream; |
| import java.io.Serializable; |
| import java.io.StringWriter; |
| import java.net.MalformedURLException; |
| import java.net.URL; |
| import java.nio.charset.StandardCharsets; |
| import java.nio.file.FileSystems; |
| import java.nio.file.Files; |
| import java.nio.file.StandardOpenOption; |
| import java.sql.Connection; |
| import java.sql.DriverManager; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Comparator; |
| import java.util.Deque; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.SortedMap; |
| import java.util.TreeMap; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.TimeUnit; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| import java.util.stream.Stream; |
| |
| import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.commons.io.IOUtils; |
| import org.apache.commons.io.output.ByteArrayOutputStream; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.commons.lang3.tuple.ImmutablePair; |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.hadoop.fs.CommonConfigurationKeysPublic; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; |
| import org.apache.hadoop.hive.cli.CliDriver; |
| import org.apache.hadoop.hive.cli.CliSessionState; |
| import org.apache.hadoop.hive.common.io.CachingPrintStream; |
| import org.apache.hadoop.hive.common.io.DigestPrintStream; |
| import org.apache.hadoop.hive.common.io.SortAndDigestPrintStream; |
| import org.apache.hadoop.hive.common.io.SortPrintStream; |
| import org.apache.hadoop.hive.conf.HiveConf; |
| import org.apache.hadoop.hive.conf.HiveConf.ConfVars; |
| import org.apache.hadoop.hive.llap.io.api.LlapProxy; |
| import org.apache.hadoop.hive.metastore.Warehouse; |
| import org.apache.hadoop.hive.ql.exec.FunctionRegistry; |
| import org.apache.hadoop.hive.ql.exec.Task; |
| import org.apache.hadoop.hive.ql.exec.Utilities; |
| import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; |
| import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; |
| import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; |
| import org.apache.hadoop.hive.ql.lockmgr.zookeeper.CuratorFrameworkSingleton; |
| import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager; |
| import org.apache.hadoop.hive.ql.metadata.Hive; |
| import org.apache.hadoop.hive.ql.metadata.HiveMaterializedViewsRegistry; |
| import org.apache.hadoop.hive.ql.metadata.InvalidTableException; |
| import org.apache.hadoop.hive.ql.metadata.Table; |
| import org.apache.hadoop.hive.ql.parse.ASTNode; |
| import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; |
| import org.apache.hadoop.hive.ql.parse.ParseDriver; |
| import org.apache.hadoop.hive.ql.parse.ParseException; |
| import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; |
| import org.apache.hadoop.hive.ql.parse.SemanticException; |
| import org.apache.hadoop.hive.ql.processors.CommandProcessor; |
| import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory; |
| import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; |
| import org.apache.hadoop.hive.ql.processors.HiveCommand; |
| import org.apache.hadoop.hive.ql.session.SessionState; |
| import org.apache.hadoop.hive.shims.HadoopShims; |
| import org.apache.hadoop.hive.shims.ShimLoader; |
| import org.apache.hive.common.util.StreamPrinter; |
| import org.apache.logging.log4j.util.Strings; |
| import org.apache.phoenix.compat.CompatUtil; |
| import org.apache.phoenix.compat.HiveCompatUtil; |
| import org.apache.phoenix.compat.MyResult; |
| import org.apache.tools.ant.BuildException; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.apache.zookeeper.ZooKeeper; |
| import org.junit.Assert; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; |
| import org.apache.phoenix.thirdparty.com.google.common.base.Throwables; |
| import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList; |
| |
| import junit.framework.TestSuite; |
| |
| /** |
| * QTestUtil. Cloned from Hive 3.0.0 as hive doesn't release hive-it-util artifact |
| * Some changes has been applyed to make Hive 3 and Hive 2 work with the same file |
| * |
| */ |
| public class QTestUtil { |
| public static final String UTF_8 = "UTF-8"; |
| public static final String HIVE_ROOT = getHiveRoot(); |
| // security property names |
| private static final String SECURITY_KEY_PROVIDER_URI_NAME = "dfs.encryption.key.provider.uri"; |
| private static final String CRLF = System.getProperty("line.separator"); |
| |
| public static final String QTEST_LEAVE_FILES = "QTEST_LEAVE_FILES"; |
| private static final Logger LOG = LoggerFactory.getLogger("QTestUtil"); |
| private final static String defaultInitScript = "q_test_init.sql"; |
| private final static String defaultCleanupScript = "q_test_cleanup.sql"; |
| private final String[] testOnlyCommands = new String[]{"crypto"}; |
| |
| private static final String TEST_TMP_DIR_PROPERTY = "test.tmp.dir"; // typically target/tmp |
| private static final String BUILD_DIR_PROPERTY = "build.dir"; // typically target |
| |
| public static final String PATH_HDFS_REGEX = "(hdfs://)([a-zA-Z0-9:/_\\-\\.=])+"; |
| public static final String PATH_HDFS_WITH_DATE_USER_GROUP_REGEX = "([a-z]+) ([a-z]+)([ ]+)([0-9]+) ([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}) " + PATH_HDFS_REGEX; |
| private static String DEFAULT_DATABASE_NAME = HiveCompatUtil.getDefaultDatabaseName(); |
| |
| private String testWarehouse; |
| private final String testFiles; |
| protected final String outDir; |
| protected String overrideResultsDir; |
| protected final String logDir; |
| private final TreeMap<String, String> qMap; |
| private final Set<String> qSkipSet; |
| private final Set<String> qSortSet; |
| private final Set<String> qSortQuerySet; |
| private final Set<String> qHashQuerySet; |
| private final Set<String> qSortNHashQuerySet; |
| private final Set<String> qNoSessionReuseQuerySet; |
| private final Set<String> qJavaVersionSpecificOutput; |
| private static final String SORT_SUFFIX = ".sorted"; |
| private final Set<String> srcTables; |
| private final Set<String> srcUDFs; |
| private final MiniClusterType clusterType; |
| private final FsType fsType; |
| private ParseDriver pd; |
| protected Hive db; |
| protected QueryState queryState; |
| protected HiveConf conf; |
| private Object drv; // IDriver in Phoenix5Hive3, null in Phoenix4Hive2 |
| private BaseSemanticAnalyzer sem; |
| protected final boolean overWrite; |
| private CliDriver cliDriver; |
| private HadoopShims.MiniMrShim mr = null; |
| private HadoopShims.MiniDFSShim dfs = null; |
| private FileSystem fs; |
| private HadoopShims.HdfsEncryptionShim hes = null; |
| private String hadoopVer = null; |
| private QTestSetup setup = null; |
| private SparkSession sparkSession = null; |
| private boolean isSessionStateStarted = false; |
| private static final String javaVersion = getJavaVersion(); |
| |
| private final String initScript; |
| private final String cleanupScript; |
| |
| |
| public interface SuiteAddTestFunctor { |
| public void addTestToSuite(TestSuite suite, Object setup, String tName); |
| } |
| |
| public static Set<String> getSrcTables() { |
| HashSet<String> srcTables = new HashSet<String>(); |
| // FIXME: moved default value to here...for now |
| // i think this features is never really used from the command line |
| String defaultTestSrcTables = "src,src1,srcbucket,srcbucket2,src_json,src_thrift," + |
| "src_sequencefile,srcpart,alltypesorc,src_hbase,cbo_t1,cbo_t2,cbo_t3,src_cbo,part," + |
| "lineitem,alltypesparquet"; |
| for (String srcTable : System.getProperty("test.src.tables", defaultTestSrcTables).trim().split(",")) { |
| srcTable = srcTable.trim(); |
| if (!srcTable.isEmpty()) { |
| srcTables.add(srcTable); |
| } |
| } |
| if (srcTables.isEmpty()) { |
| throw new RuntimeException("Source tables cannot be empty"); |
| } |
| return srcTables; |
| } |
| |
| /** |
| * Returns the default UDF names which should not be removed when resetting the test database |
| * @return The list of the UDF names not to remove |
| */ |
| private Set<String> getSrcUDFs() { |
| HashSet<String> srcUDFs = new HashSet<String>(); |
| // FIXME: moved default value to here...for now |
| // i think this features is never really used from the command line |
| String defaultTestSrcUDFs = "qtest_get_java_boolean"; |
| for (String srcUDF : System.getProperty("test.src.udfs", defaultTestSrcUDFs).trim().split(",")) |
| { |
| srcUDF = srcUDF.trim(); |
| if (!srcUDF.isEmpty()) { |
| srcUDFs.add(srcUDF); |
| } |
| } |
| if (srcUDFs.isEmpty()) { |
| throw new RuntimeException("Source UDFs cannot be empty"); |
| } |
| return srcUDFs; |
| } |
| |
| |
| |
| public HiveConf getConf() { |
| return conf; |
| } |
| |
| public boolean deleteDirectory(File path) { |
| if (path.exists()) { |
| File[] files = path.listFiles(); |
| for (File file : files) { |
| if (file.isDirectory()) { |
| deleteDirectory(file); |
| } else { |
| file.delete(); |
| } |
| } |
| } |
| return (path.delete()); |
| } |
| |
| public void copyDirectoryToLocal(Path src, Path dest) throws Exception { |
| |
| FileSystem srcFs = src.getFileSystem(conf); |
| FileSystem destFs = dest.getFileSystem(conf); |
| if (srcFs.exists(src)) { |
| FileStatus[] files = srcFs.listStatus(src); |
| for (FileStatus file : files) { |
| String name = file.getPath().getName(); |
| Path dfs_path = file.getPath(); |
| Path local_path = new Path(dest, name); |
| |
| // If this is a source table we do not copy it out |
| if (srcTables.contains(name)) { |
| continue; |
| } |
| |
| if (file.isDirectory()) { |
| if (!destFs.exists(local_path)) { |
| destFs.mkdirs(local_path); |
| } |
| copyDirectoryToLocal(dfs_path, local_path); |
| } else { |
| srcFs.copyToLocalFile(dfs_path, local_path); |
| } |
| } |
| } |
| } |
| |
| static Pattern mapTok = Pattern.compile("(\\.?)(.*)_map_(.*)"); |
| static Pattern reduceTok = Pattern.compile("(.*)(reduce_[^\\.]*)((\\..*)?)"); |
| |
| public void normalizeNames(File path) throws Exception { |
| if (path.isDirectory()) { |
| File[] files = path.listFiles(); |
| for (File file : files) { |
| normalizeNames(file); |
| } |
| } else { |
| Matcher m = reduceTok.matcher(path.getName()); |
| if (m.matches()) { |
| String name = m.group(1) + "reduce" + m.group(3); |
| path.renameTo(new File(path.getParent(), name)); |
| } else { |
| m = mapTok.matcher(path.getName()); |
| if (m.matches()) { |
| String name = m.group(1) + "map_" + m.group(3); |
| path.renameTo(new File(path.getParent(), name)); |
| } |
| } |
| } |
| } |
| |
| public String getOutputDirectory() { |
| return outDir; |
| } |
| |
| public String getLogDirectory() { |
| return logDir; |
| } |
| |
| private String getHadoopMainVersion(String input) { |
| if (input == null) { |
| return null; |
| } |
| Pattern p = Pattern.compile("^(\\d+\\.\\d+).*"); |
| Matcher m = p.matcher(input); |
| if (m.matches()) { |
| return m.group(1); |
| } |
| return null; |
| } |
| |
| public void initConf() throws Exception { |
| |
| String vectorizationEnabled = System.getProperty("test.vectorization.enabled"); |
| if(vectorizationEnabled != null && vectorizationEnabled.equalsIgnoreCase("true")) { |
| conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, true); |
| } |
| |
| // Plug verifying metastore in for testing DirectSQL. |
| conf.setVar(ConfVars.METASTORE_RAW_STORE_IMPL, |
| "org.apache.hadoop.hive.metastore.VerifyingObjectStore"); |
| |
| if (mr != null) { |
| mr.setupConfiguration(conf); |
| |
| // TODO Ideally this should be done independent of whether mr is setup or not. |
| setFsRelatedProperties(conf, fs.getScheme().equals("file"),fs); |
| } |
| conf.set(ConfVars.HIVE_EXECUTION_ENGINE.varname, clusterType.name()); |
| } |
| |
| private void setFsRelatedProperties(HiveConf conf, boolean isLocalFs, FileSystem fs) { |
| String fsUriString = fs.getUri().toString(); |
| |
| // Different paths if running locally vs a remote fileSystem. Ideally this difference should not exist. |
| Path warehousePath; |
| Path jarPath; |
| Path userInstallPath; |
| if (isLocalFs) { |
| String buildDir = System.getProperty(BUILD_DIR_PROPERTY); |
| Preconditions.checkState(Strings.isNotBlank(buildDir)); |
| Path path = new Path(fsUriString, buildDir); |
| |
| // Create a fake fs root for local fs |
| Path localFsRoot = new Path(path, "localfs"); |
| warehousePath = new Path(localFsRoot, "warehouse"); |
| jarPath = new Path(localFsRoot, "jar"); |
| userInstallPath = new Path(localFsRoot, "user_install"); |
| } else { |
| // TODO Why is this changed from the default in hive-conf? |
| warehousePath = new Path(fsUriString, "/build/ql/test/data/warehouse/"); |
| jarPath = new Path(new Path(fsUriString, "/user"), "hive"); |
| userInstallPath = new Path(fsUriString, "/user"); |
| } |
| |
| warehousePath = fs.makeQualified(warehousePath); |
| jarPath = fs.makeQualified(jarPath); |
| userInstallPath = fs.makeQualified(userInstallPath); |
| |
| conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, fsUriString); |
| |
| // Remote dirs |
| conf.setVar(ConfVars.METASTOREWAREHOUSE, warehousePath.toString()); |
| conf.setVar(ConfVars.HIVE_JAR_DIRECTORY, jarPath.toString()); |
| conf.setVar(ConfVars.HIVE_USER_INSTALL_DIR, userInstallPath.toString()); |
| // ConfVars.SCRATCHDIR - {test.tmp.dir}/scratchdir |
| |
| // Local dirs |
| // ConfVars.LOCALSCRATCHDIR - {test.tmp.dir}/localscratchdir |
| |
| // TODO Make sure to cleanup created dirs. |
| } |
| |
| private void createRemoteDirs() { |
| assert fs != null; |
| Path warehousePath = fs.makeQualified(new Path(conf.getVar(ConfVars.METASTOREWAREHOUSE))); |
| assert warehousePath != null; |
| Path hiveJarPath = fs.makeQualified(new Path(conf.getVar(ConfVars.HIVE_JAR_DIRECTORY))); |
| assert hiveJarPath != null; |
| Path userInstallPath = fs.makeQualified(new Path(conf.getVar(ConfVars.HIVE_USER_INSTALL_DIR))); |
| assert userInstallPath != null; |
| try { |
| fs.mkdirs(warehousePath); |
| } catch (IOException e) { |
| LOG.error("Failed to create path={}. Continuing. Exception message={}", warehousePath, |
| e.getMessage()); |
| } |
| try { |
| fs.mkdirs(hiveJarPath); |
| } catch (IOException e) { |
| LOG.error("Failed to create path={}. Continuing. Exception message={}", warehousePath, |
| e.getMessage()); |
| } |
| try { |
| fs.mkdirs(userInstallPath); |
| } catch (IOException e) { |
| LOG.error("Failed to create path={}. Continuing. Exception message={}", warehousePath, |
| e.getMessage()); |
| } |
| } |
| |
| private enum CoreClusterType { |
| MR, |
| TEZ, |
| SPARK, |
| DRUID |
| } |
| |
| public enum FsType { |
| local, |
| hdfs, |
| encrypted_hdfs, |
| } |
| |
| public enum MiniClusterType { |
| |
| mr(CoreClusterType.MR, FsType.hdfs), |
| tez(CoreClusterType.TEZ, FsType.hdfs), |
| tez_local(CoreClusterType.TEZ, FsType.local), |
| spark(CoreClusterType.SPARK, FsType.local), |
| miniSparkOnYarn(CoreClusterType.SPARK, FsType.hdfs), |
| llap(CoreClusterType.TEZ, FsType.hdfs), |
| llap_local(CoreClusterType.TEZ, FsType.local), |
| none(CoreClusterType.MR, FsType.local), |
| druid(CoreClusterType.DRUID, FsType.hdfs); |
| |
| |
| private final CoreClusterType coreClusterType; |
| private final FsType defaultFsType; |
| |
| MiniClusterType(CoreClusterType coreClusterType, FsType defaultFsType) { |
| this.coreClusterType = coreClusterType; |
| this.defaultFsType = defaultFsType; |
| } |
| |
| public CoreClusterType getCoreClusterType() { |
| return coreClusterType; |
| } |
| |
| public FsType getDefaultFsType() { |
| return defaultFsType; |
| } |
| |
| public static MiniClusterType valueForString(String type) { |
| // Replace this with valueOf. |
| if (type.equals("miniMR")) { |
| return mr; |
| } else if (type.equals("tez")) { |
| return tez; |
| } else if (type.equals("tez_local")) { |
| return tez_local; |
| } else if (type.equals("spark")) { |
| return spark; |
| } else if (type.equals("miniSparkOnYarn")) { |
| return miniSparkOnYarn; |
| } else if (type.equals("llap")) { |
| return llap; |
| } else if (type.equals("llap_local")) { |
| return llap_local; |
| } else if (type.equals("druid")) { |
| return druid; |
| } else { |
| return none; |
| } |
| } |
| } |
| |
| |
| private String getKeyProviderURI() { |
| // Use the target directory if it is not specified |
| String keyDir = HIVE_ROOT + "ql/target/"; |
| |
| // put the jks file in the current test path only for test purpose |
| return "jceks://file" + new Path(keyDir, "test.jks").toUri(); |
| } |
| |
| public QTestUtil(String outDir, String logDir, MiniClusterType clusterType, |
| String confDir, String hadoopVer, String initScript, String cleanupScript, |
| boolean withLlapIo) throws Exception { |
| this(outDir, logDir, clusterType, confDir, hadoopVer, initScript, cleanupScript, |
| withLlapIo, null); |
| } |
| |
| public QTestUtil(String outDir, String logDir, MiniClusterType clusterType, |
| String confDir, String hadoopVer, String initScript, String cleanupScript, |
| boolean withLlapIo, FsType fsType) |
| throws Exception { |
| LOG.info("Setting up QTestUtil with outDir="+outDir+", logDir="+logDir+", clusterType="+clusterType+", confDir="+confDir+"," + |
| " hadoopVer="+hadoopVer+", initScript="+initScript+", cleanupScript="+cleanupScript+", withLlapIo="+withLlapIo+"," + |
| " fsType="+fsType+""); |
| Preconditions.checkNotNull(clusterType, "ClusterType cannot be null"); |
| if (fsType != null) { |
| this.fsType = fsType; |
| } else { |
| this.fsType = clusterType.getDefaultFsType(); |
| } |
| this.outDir = outDir; |
| this.logDir = logDir; |
| this.srcTables=getSrcTables(); |
| this.srcUDFs = getSrcUDFs(); |
| |
| MyResult result = HiveCompatUtil.doSetup(confDir); |
| conf = result.getFirst(); |
| queryState = result.getSecond(); |
| |
| this.hadoopVer = getHadoopMainVersion(hadoopVer); |
| qMap = new TreeMap<String, String>(); |
| qSkipSet = new HashSet<String>(); |
| qSortSet = new HashSet<String>(); |
| qSortQuerySet = new HashSet<String>(); |
| qHashQuerySet = new HashSet<String>(); |
| qSortNHashQuerySet = new HashSet<String>(); |
| qNoSessionReuseQuerySet = new HashSet<String>(); |
| qJavaVersionSpecificOutput = new HashSet<String>(); |
| this.clusterType = clusterType; |
| |
| HadoopShims shims = ShimLoader.getHadoopShims(); |
| |
| setupFileSystem(shims); |
| |
| setup = new QTestSetup(); |
| setup.preTest(conf); |
| |
| setupMiniCluster(shims, confDir); |
| |
| initConf(); |
| |
| if (withLlapIo && (clusterType == MiniClusterType.none)) { |
| LOG.info("initializing llap IO"); |
| LlapProxy.initializeLlapIo(conf); |
| } |
| |
| |
| // Use the current directory if it is not specified |
| String dataDir = conf.get("test.data.files"); |
| if (dataDir == null) { |
| dataDir = new File(".").getAbsolutePath() + "/data/files"; |
| } |
| testFiles = dataDir; |
| |
| // Use the current directory if it is not specified |
| String scriptsDir = conf.get("test.data.scripts"); |
| if (scriptsDir == null) { |
| scriptsDir = new File(".").getAbsolutePath() + "/data/scripts"; |
| } |
| |
| this.initScript = scriptsDir + File.separator + initScript; |
| this.cleanupScript = scriptsDir + File.separator + cleanupScript; |
| |
| overWrite = "true".equalsIgnoreCase(System.getProperty("test.output.overwrite")); |
| |
| init(); |
| } |
| |
| private void setupFileSystem(HadoopShims shims) throws IOException { |
| |
| if (fsType == FsType.local) { |
| fs = FileSystem.getLocal(conf); |
| } else if (fsType == FsType.hdfs || fsType == FsType.encrypted_hdfs) { |
| int numDataNodes = 4; |
| |
| if (fsType == FsType.encrypted_hdfs) { |
| // Set the security key provider so that the MiniDFS cluster is initialized |
| // with encryption |
| conf.set(SECURITY_KEY_PROVIDER_URI_NAME, getKeyProviderURI()); |
| conf.setInt("fs.trash.interval", 50); |
| |
| dfs = shims.getMiniDfs(conf, numDataNodes, true, null); |
| fs = dfs.getFileSystem(); |
| |
| // set up the java key provider for encrypted hdfs cluster |
| hes = shims.createHdfsEncryptionShim(fs, conf); |
| |
| LOG.info("key provider is initialized"); |
| } else { |
| dfs = shims.getMiniDfs(conf, numDataNodes, true, null); |
| fs = dfs.getFileSystem(); |
| } |
| } else { |
| throw new IllegalArgumentException("Unknown or unhandled fsType [" + fsType + "]"); |
| } |
| } |
| |
| private void setupMiniCluster(HadoopShims shims, String confDir) throws |
| IOException { |
| |
| String uriString = fs.getUri().toString(); |
| |
| if (clusterType.getCoreClusterType() == CoreClusterType.TEZ) { |
| if (confDir != null && !confDir.isEmpty()) { |
| conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() |
| + "/tez-site.xml")); |
| } |
| int numTrackers = 2; |
| if (EnumSet.of(MiniClusterType.llap_local, MiniClusterType.tez_local).contains(clusterType)) { |
| mr = shims.getLocalMiniTezCluster(conf, clusterType == MiniClusterType.llap_local); |
| } else { |
| mr = shims.getMiniTezCluster(conf, numTrackers, uriString, |
| EnumSet.of(MiniClusterType.llap, MiniClusterType.llap_local).contains(clusterType)); |
| } |
| } else if (clusterType == MiniClusterType.miniSparkOnYarn) { |
| mr = shims.getMiniSparkCluster(conf, 2, uriString, 1); |
| } else if (clusterType == MiniClusterType.mr) { |
| mr = shims.getMiniMrCluster(conf, 2, uriString, 1); |
| } |
| } |
| |
| |
| public void shutdown() throws Exception { |
| if (System.getenv(QTEST_LEAVE_FILES) == null) { |
| cleanUp(); |
| } |
| |
| if (CompatUtil.isPhoenix5() && clusterType.getCoreClusterType() == CoreClusterType.TEZ){ |
| HiveCompatUtil.destroyTEZSession(SessionState.get()); |
| } |
| |
| setup.tearDown(); |
| if (sparkSession != null) { |
| try { |
| SparkSessionManagerImpl.getInstance().closeSession(sparkSession); |
| } catch (Exception ex) { |
| LOG.error("Error closing spark session.", ex); |
| } finally { |
| sparkSession = null; |
| } |
| } |
| if (mr != null) { |
| mr.shutdown(); |
| mr = null; |
| } |
| FileSystem.closeAll(); |
| if (dfs != null) { |
| dfs.shutdown(); |
| dfs = null; |
| } |
| Hive.closeCurrent(); |
| } |
| |
| public String readEntireFileIntoString(File queryFile) throws IOException { |
| InputStreamReader isr = new InputStreamReader( |
| new BufferedInputStream(new FileInputStream(queryFile)), QTestUtil.UTF_8); |
| StringWriter sw = new StringWriter(); |
| try { |
| IOUtils.copy(isr, sw); |
| } finally { |
| if (isr != null) { |
| isr.close(); |
| } |
| } |
| return sw.toString(); |
| } |
| |
| public void addFile(String queryFile) throws IOException { |
| addFile(queryFile, false); |
| } |
| |
| public void addFile(String queryFile, boolean partial) throws IOException { |
| addFile(new File(queryFile)); |
| } |
| |
| public void addFile(File qf) throws IOException { |
| addFile(qf, false); |
| } |
| |
| public void addFile(File qf, boolean partial) throws IOException { |
| String query = readEntireFileIntoString(qf); |
| qMap.put(qf.getName(), query); |
| if (partial) { |
| return; |
| } |
| |
| if(checkHadoopVersionExclude(qf.getName(), query)) { |
| qSkipSet.add(qf.getName()); |
| } |
| |
| if (checkNeedJavaSpecificOutput(qf.getName(), query)) { |
| qJavaVersionSpecificOutput.add(qf.getName()); |
| } |
| |
| if (matches(SORT_BEFORE_DIFF, query)) { |
| qSortSet.add(qf.getName()); |
| } else if (matches(SORT_QUERY_RESULTS, query)) { |
| qSortQuerySet.add(qf.getName()); |
| } else if (matches(HASH_QUERY_RESULTS, query)) { |
| qHashQuerySet.add(qf.getName()); |
| } else if (matches(SORT_AND_HASH_QUERY_RESULTS, query)) { |
| qSortNHashQuerySet.add(qf.getName()); |
| } |
| if (matches(NO_SESSION_REUSE, query)) { |
| qNoSessionReuseQuerySet.add(qf.getName()); |
| } |
| } |
| |
| private static final Pattern SORT_BEFORE_DIFF = Pattern.compile("-- SORT_BEFORE_DIFF"); |
| private static final Pattern SORT_QUERY_RESULTS = Pattern.compile("-- SORT_QUERY_RESULTS"); |
| private static final Pattern HASH_QUERY_RESULTS = Pattern.compile("-- HASH_QUERY_RESULTS"); |
| private static final Pattern SORT_AND_HASH_QUERY_RESULTS = Pattern.compile("-- SORT_AND_HASH_QUERY_RESULTS"); |
| private static final Pattern NO_SESSION_REUSE = Pattern.compile("-- NO_SESSION_REUSE"); |
| |
| private boolean matches(Pattern pattern, String query) { |
| Matcher matcher = pattern.matcher(query); |
| if (matcher.find()) { |
| return true; |
| } |
| return false; |
| } |
| |
| private boolean checkHadoopVersionExclude(String fileName, String query){ |
| |
| // Look for a hint to not run a test on some Hadoop versions |
| Pattern pattern = Pattern.compile("-- (EX|IN)CLUDE_HADOOP_MAJOR_VERSIONS\\((.*)\\)"); |
| |
| boolean excludeQuery = false; |
| boolean includeQuery = false; |
| Set<String> versionSet = new HashSet<String>(); |
| String hadoopVer = ShimLoader.getMajorVersion(); |
| |
| Matcher matcher = pattern.matcher(query); |
| |
| // Each qfile may include at most one INCLUDE or EXCLUDE directive. |
| // |
| // If a qfile contains an INCLUDE directive, and hadoopVer does |
| // not appear in the list of versions to include, then the qfile |
| // is skipped. |
| // |
| // If a qfile contains an EXCLUDE directive, and hadoopVer is |
| // listed in the list of versions to EXCLUDE, then the qfile is |
| // skipped. |
| // |
| // Otherwise, the qfile is included. |
| |
| if (matcher.find()) { |
| |
| String prefix = matcher.group(1); |
| if ("EX".equals(prefix)) { |
| excludeQuery = true; |
| } else { |
| includeQuery = true; |
| } |
| |
| String versions = matcher.group(2); |
| for (String s : versions.split("\\,")) { |
| s = s.trim(); |
| versionSet.add(s); |
| } |
| } |
| |
| if (matcher.find()) { |
| //2nd match is not supposed to be there |
| String message = "QTestUtil: qfile " + fileName |
| + " contains more than one reference to (EX|IN)CLUDE_HADOOP_MAJOR_VERSIONS"; |
| throw new UnsupportedOperationException(message); |
| } |
| |
| if (excludeQuery && versionSet.contains(hadoopVer)) { |
| System.out.println("QTestUtil: " + fileName |
| + " EXCLUDE list contains Hadoop Version " + hadoopVer + ". Skipping..."); |
| return true; |
| } else if (includeQuery && !versionSet.contains(hadoopVer)) { |
| System.out.println("QTestUtil: " + fileName |
| + " INCLUDE list does not contain Hadoop Version " + hadoopVer + ". Skipping..."); |
| return true; |
| } |
| return false; |
| } |
| |
| private boolean checkNeedJavaSpecificOutput(String fileName, String query) { |
| Pattern pattern = Pattern.compile("-- JAVA_VERSION_SPECIFIC_OUTPUT"); |
| Matcher matcher = pattern.matcher(query); |
| if (matcher.find()) { |
| System.out.println("Test is flagged to generate Java version specific " + |
| "output. Since we are using Java version " + javaVersion + |
| ", we will generated Java " + javaVersion + " specific " + |
| "output file for query file " + fileName); |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Get formatted Java version to include minor version, but |
| * exclude patch level. |
| * |
| * @return Java version formatted as major_version.minor_version |
| */ |
| private static String getJavaVersion() { |
| String version = System.getProperty("java.version"); |
| if (version == null) { |
| throw new NullPointerException("No java version could be determined " + |
| "from system properties"); |
| } |
| |
| // "java version" system property is formatted |
| // major_version.minor_version.patch_level. |
| // Find second dot, instead of last dot, to be safe |
| int pos = version.indexOf('.'); |
| pos = version.indexOf('.', pos + 1); |
| return version.substring(0, pos); |
| } |
| |
| /** |
| * Clear out any side effects of running tests |
| */ |
| public void clearPostTestEffects() throws Exception { |
| setup.postTest(conf); |
| } |
| |
| public void clearKeysCreatedInTests() { |
| if (hes == null) { |
| return; |
| } |
| try { |
| for (String keyAlias : hes.getKeys()) { |
| hes.deleteKey(keyAlias); |
| } |
| } catch (IOException e) { |
| LOG.error("Fail to clean the keys created in test due to the error", e); |
| } |
| } |
| |
| public void clearUDFsCreatedDuringTests() throws Exception { |
| if (System.getenv(QTEST_LEAVE_FILES) != null) { |
| return; |
| } |
| // Delete functions created by the tests |
| // It is enough to remove functions from the default database, other databases are dropped |
| for (String udfName : db.getFunctions(DEFAULT_DATABASE_NAME, ".*")) { |
| if (!srcUDFs.contains(udfName)) { |
| db.dropFunction(DEFAULT_DATABASE_NAME, udfName); |
| } |
| } |
| } |
| |
| /** |
| * Clear out any side effects of running tests |
| */ |
| public void clearTablesCreatedDuringTests() throws Exception { |
| if (System.getenv(QTEST_LEAVE_FILES) != null) { |
| return; |
| } |
| |
| conf.set("hive.metastore.filter.hook", |
| "org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl"); |
| db = Hive.get(conf); |
| |
| // First delete any MVs to avoid race conditions |
| for (String dbName : db.getAllDatabases()) { |
| SessionState.get().setCurrentDatabase(dbName); |
| for (String tblName : db.getAllTables()) { |
| Table tblObj = null; |
| try { |
| tblObj = db.getTable(tblName); |
| } catch (InvalidTableException e) { |
| LOG.warn("Trying to drop table " + e.getTableName() + ". But it does not exist."); |
| continue; |
| } |
| // only remove MVs first |
| if (!tblObj.isMaterializedView()) { |
| continue; |
| } |
| db.dropTable(dbName, tblName, true, true, fsType == FsType.encrypted_hdfs); |
| } |
| } |
| |
| // Delete any tables other than the source tables |
| // and any databases other than the default database. |
| for (String dbName : db.getAllDatabases()) { |
| SessionState.get().setCurrentDatabase(dbName); |
| for (String tblName : db.getAllTables()) { |
| if (!DEFAULT_DATABASE_NAME.equals(dbName) || !srcTables.contains(tblName)) { |
| Table tblObj = null; |
| try { |
| tblObj = db.getTable(tblName); |
| } catch (InvalidTableException e) { |
| LOG.warn("Trying to drop table " + e.getTableName() + ". But it does not exist."); |
| continue; |
| } |
| // only remove MVs first |
| if (!tblObj.isMaterializedView()) { |
| continue; |
| } |
| db.dropTable(dbName, tblName, true, true, fsType == FsType.encrypted_hdfs); |
| } |
| } |
| if (!DEFAULT_DATABASE_NAME.equals(dbName)) { |
| // Drop cascade, functions dropped by cascade |
| db.dropDatabase(dbName, true, true, true); |
| } |
| } |
| |
| // delete remaining directories for external tables (can affect stats for following tests) |
| try { |
| Path p = new Path(testWarehouse); |
| FileSystem fileSystem = p.getFileSystem(conf); |
| if (fileSystem.exists(p)) { |
| for (FileStatus status : fileSystem.listStatus(p)) { |
| if (status.isDirectory() && !srcTables.contains(status.getPath().getName())) { |
| fileSystem.delete(status.getPath(), true); |
| } |
| } |
| } |
| } catch (IllegalArgumentException e) { |
| // ignore.. provides invalid url sometimes intentionally |
| } |
| SessionState.get().setCurrentDatabase(DEFAULT_DATABASE_NAME); |
| |
| List<String> roleNames = db.getAllRoleNames(); |
| for (String roleName : roleNames) { |
| if (!"PUBLIC".equalsIgnoreCase(roleName) && !"ADMIN".equalsIgnoreCase(roleName)) { |
| db.dropRole(roleName); |
| } |
| } |
| } |
| |
| /** |
| * Clear out any side effects of running tests |
| */ |
| public void clearTestSideEffects() throws Exception { |
| if (System.getenv(QTEST_LEAVE_FILES) != null) { |
| return; |
| } |
| |
| if(CompatUtil.isPhoenix5()){ |
| HiveCompatUtil.cleanupQueryResultCache(); |
| } |
| |
| // allocate and initialize a new conf since a test can |
| // modify conf by using 'set' commands |
| conf = HiveCompatUtil.getHiveConf(); |
| initConf(); |
| initConfFromSetup(); |
| |
| // renew the metastore since the cluster type is unencrypted |
| db = Hive.get(conf); // propagate new conf to meta store |
| |
| clearTablesCreatedDuringTests(); |
| clearUDFsCreatedDuringTests(); |
| clearKeysCreatedInTests(); |
| } |
| |
| protected void initConfFromSetup() throws Exception { |
| setup.preTest(conf); |
| } |
| |
| public void cleanUp() throws Exception { |
| cleanUp(null); |
| } |
| |
| public void cleanUp(String tname) throws Exception { |
| boolean canReuseSession = (tname == null) || !qNoSessionReuseQuerySet.contains(tname); |
| if(!isSessionStateStarted) { |
| startSessionState(canReuseSession); |
| } |
| if (System.getenv(QTEST_LEAVE_FILES) != null) { |
| return; |
| } |
| |
| clearTablesCreatedDuringTests(); |
| clearUDFsCreatedDuringTests(); |
| clearKeysCreatedInTests(); |
| |
| File cleanupFile = new File(cleanupScript); |
| if (cleanupFile.isFile()) { |
| String cleanupCommands = readEntireFileIntoString(cleanupFile); |
| LOG.info("Cleanup (" + cleanupScript + "):\n" + cleanupCommands); |
| if(cliDriver == null) { |
| cliDriver = new CliDriver(); |
| } |
| SessionState.get().getConf().setBoolean("hive.test.shutdown.phase", true); |
| int result = cliDriver.processLine(cleanupCommands); |
| if (result != 0) { |
| LOG.error("Failed during cleanup processLine with code={}. Ignoring", result); |
| // TODO Convert this to an Assert.fail once HIVE-14682 is fixed |
| } |
| SessionState.get().getConf().setBoolean("hive.test.shutdown.phase", false); |
| } else { |
| LOG.info("No cleanup script detected. Skipping."); |
| } |
| |
| // delete any contents in the warehouse dir |
| Path p = new Path(testWarehouse); |
| FileSystem fs = p.getFileSystem(conf); |
| |
| try { |
| FileStatus [] ls = fs.listStatus(p); |
| for (int i=0; (ls != null) && (i<ls.length); i++) { |
| fs.delete(ls[i].getPath(), true); |
| } |
| } catch (FileNotFoundException e) { |
| // Best effort |
| } |
| |
| // TODO: Clean up all the other paths that are created. |
| |
| FunctionRegistry.unregisterTemporaryUDF("test_udaf"); |
| FunctionRegistry.unregisterTemporaryUDF("test_error"); |
| } |
| |
| protected void runCreateTableCmd(String createTableCmd) throws Exception { |
| int ecode = 0; |
| ecode = HiveCompatUtil.getDriverResponseCode(drv, createTableCmd); |
| if (ecode != 0) { |
| throw new Exception("create table command: " + createTableCmd |
| + " failed with exit code= " + ecode); |
| } |
| |
| return; |
| } |
| |
| protected void runCmd(String cmd) throws Exception { |
| int ecode = 0; |
| ecode = HiveCompatUtil.getDriverResponseCode(drv, cmd); |
| HiveCompatUtil.closeDriver(drv); |
| if (ecode != 0) { |
| throw new Exception("command: " + cmd |
| + " failed with exit code= " + ecode); |
| } |
| return; |
| } |
| |
| public void createSources() throws Exception { |
| createSources(null); |
| } |
| |
| public void createSources(String tname) throws Exception { |
| boolean canReuseSession = (tname == null) || !qNoSessionReuseQuerySet.contains(tname); |
| if(!isSessionStateStarted) { |
| startSessionState(canReuseSession); |
| } |
| |
| if(cliDriver == null) { |
| cliDriver = new CliDriver(); |
| } |
| cliDriver.processLine("set test.data.dir=" + testFiles + ";"); |
| File scriptFile = new File(this.initScript); |
| if (!scriptFile.isFile()) { |
| LOG.info("No init script detected. Skipping"); |
| return; |
| } |
| conf.setBoolean("hive.test.init.phase", true); |
| |
| String initCommands = readEntireFileIntoString(scriptFile); |
| LOG.info("Initial setup (" + initScript + "):\n" + initCommands); |
| |
| int result = cliDriver.processLine(initCommands); |
| LOG.info("Result from cliDrriver.processLine in createSources=" + result); |
| if (result != 0) { |
| Assert.fail("Failed during createSources processLine with code=" + result); |
| } |
| |
| conf.setBoolean("hive.test.init.phase", false); |
| } |
| |
| public void init() throws Exception { |
| |
| // Create remote dirs once. |
| if (mr != null) { |
| createRemoteDirs(); |
| } |
| |
| if (CompatUtil.isPhoenix5()) { |
| // Create views registry |
| HiveCompatUtil.initHiveMaterializedViewsRegistry(); |
| } |
| |
| testWarehouse = conf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE); |
| String execEngine = conf.get("hive.execution.engine"); |
| conf.set("hive.execution.engine", "mr"); |
| SessionState.start(conf); |
| conf.set("hive.execution.engine", execEngine); |
| db = Hive.get(conf); |
| drv = HiveCompatUtil.getDriver(conf); |
| if(CompatUtil.isPhoenix4()) { |
| HiveCompatUtil.initHiveMaterializedViewsRegistry(db); |
| } |
| pd = new ParseDriver(); |
| sem = new SemanticAnalyzer(queryState); |
| } |
| |
| public void init(String tname) throws Exception { |
| cleanUp(tname); |
| createSources(tname); |
| cliDriver.processCmd("set hive.cli.print.header=true;"); |
| } |
| |
| public void cliInit(String tname) throws Exception { |
| cliInit(tname, true); |
| } |
| |
| public String cliInit(String tname, boolean recreate) throws Exception { |
| if (recreate) { |
| cleanUp(tname); |
| createSources(tname); |
| } |
| |
| HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER, |
| "org.apache.hadoop.hive.ql.security.DummyAuthenticator"); |
| Utilities.clearWorkMap(conf); |
| CliSessionState ss = new CliSessionState(conf); |
| assert ss != null; |
| ss.in = System.in; |
| |
| String outFileExtension = getOutFileExtension(tname); |
| String stdoutName = null; |
| if (outDir != null) { |
| // TODO: why is this needed? |
| File qf = new File(outDir, tname); |
| stdoutName = qf.getName().concat(outFileExtension); |
| } else { |
| stdoutName = tname + outFileExtension; |
| } |
| |
| File outf = new File(logDir, stdoutName); |
| OutputStream fo = new BufferedOutputStream(new FileOutputStream(outf)); |
| if (qSortQuerySet.contains(tname)) { |
| ss.out = new SortPrintStream(fo, "UTF-8"); |
| } else if (qHashQuerySet.contains(tname)) { |
| ss.out = new DigestPrintStream(fo, "UTF-8"); |
| } else if (qSortNHashQuerySet.contains(tname)) { |
| ss.out = new SortAndDigestPrintStream(fo, "UTF-8"); |
| } else { |
| ss.out = new PrintStream(fo, true, "UTF-8"); |
| } |
| ss.err = new CachingPrintStream(fo, true, "UTF-8"); |
| ss.setIsSilent(true); |
| SessionState oldSs = SessionState.get(); |
| |
| boolean canReuseSession = !qNoSessionReuseQuerySet.contains(tname); |
| if (oldSs != null && canReuseSession && clusterType.getCoreClusterType() == CoreClusterType.TEZ) { |
| // Copy the tezSessionState from the old CliSessionState. |
| TezSessionState tezSessionState = oldSs.getTezSession(); |
| oldSs.setTezSession(null); |
| ss.setTezSession(tezSessionState); |
| oldSs.close(); |
| } |
| |
| if (oldSs != null && clusterType.getCoreClusterType() == CoreClusterType.SPARK) { |
| sparkSession = oldSs.getSparkSession(); |
| ss.setSparkSession(sparkSession); |
| oldSs.setSparkSession(null); |
| oldSs.close(); |
| } |
| |
| if (oldSs != null && oldSs.out != null && oldSs.out != System.out) { |
| oldSs.out.close(); |
| } |
| if (oldSs != null) { |
| oldSs.close(); |
| } |
| SessionState.start(ss); |
| |
| cliDriver = new CliDriver(); |
| |
| if (tname.equals("init_file.q")) { |
| ss.initFiles.add(HIVE_ROOT + "/data/scripts/test_init_file.sql"); |
| } |
| cliDriver.processInitFiles(ss); |
| |
| return outf.getAbsolutePath(); |
| } |
| |
| private CliSessionState startSessionState(boolean canReuseSession) |
| throws IOException { |
| |
| HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER, |
| "org.apache.hadoop.hive.ql.security.DummyAuthenticator"); |
| |
| String execEngine = conf.get("hive.execution.engine"); |
| conf.set("hive.execution.engine", "mr"); |
| CliSessionState ss = new CliSessionState(conf); |
| assert ss != null; |
| ss.in = System.in; |
| ss.out = System.out; |
| ss.err = System.out; |
| |
| SessionState oldSs = SessionState.get(); |
| if (oldSs != null && canReuseSession && clusterType.getCoreClusterType() == CoreClusterType.TEZ) { |
| // Copy the tezSessionState from the old CliSessionState. |
| TezSessionState tezSessionState = oldSs.getTezSession(); |
| ss.setTezSession(tezSessionState); |
| oldSs.setTezSession(null); |
| oldSs.close(); |
| } |
| |
| if (oldSs != null && clusterType.getCoreClusterType() == CoreClusterType.SPARK) { |
| sparkSession = oldSs.getSparkSession(); |
| ss.setSparkSession(sparkSession); |
| oldSs.setSparkSession(null); |
| oldSs.close(); |
| } |
| if (oldSs != null && oldSs.out != null && oldSs.out != System.out) { |
| oldSs.out.close(); |
| } |
| if (oldSs != null) { |
| oldSs.close(); |
| } |
| SessionState.start(ss); |
| |
| isSessionStateStarted = true; |
| |
| conf.set("hive.execution.engine", execEngine); |
| return ss; |
| } |
| |
| public int executeAdhocCommand(String q) { |
| if (!q.contains(";")) { |
| return -1; |
| } |
| |
| String q1 = q.split(";")[0] + ";"; |
| |
| LOG.debug("Executing " + q1); |
| return cliDriver.processLine(q1); |
| } |
| |
| public int executeOne(String tname) { |
| String q = qMap.get(tname); |
| |
| if (q.indexOf(";") == -1) { |
| return -1; |
| } |
| |
| String q1 = q.substring(0, q.indexOf(";") + 1); |
| String qrest = q.substring(q.indexOf(";") + 1); |
| qMap.put(tname, qrest); |
| |
| System.out.println("Executing " + q1); |
| return cliDriver.processLine(q1); |
| } |
| |
| public int execute(String tname) { |
| return HiveCompatUtil.getDriverResponseCode(drv, qMap.get(tname)); |
| } |
| |
| public int executeClient(String tname1, String tname2) { |
| String commands = getCommand(tname1) + CRLF + getCommand(tname2); |
| return executeClientInternal(commands); |
| } |
| |
| public int executeClient(String tname) { |
| return executeClientInternal(getCommand(tname)); |
| } |
| |
| private int executeClientInternal(String commands) { |
| List<String> cmds = CliDriver.splitSemiColon(commands); |
| int rc = 0; |
| |
| String command = ""; |
| for (String oneCmd : cmds) { |
| if (StringUtils.endsWith(oneCmd, "\\")) { |
| command += StringUtils.chop(oneCmd) + "\\;"; |
| continue; |
| } else { |
| if (isHiveCommand(oneCmd)) { |
| command = oneCmd; |
| } else { |
| command += oneCmd; |
| } |
| } |
| if (StringUtils.isBlank(command)) { |
| continue; |
| } |
| |
| if (isCommandUsedForTesting(command)) { |
| rc = executeTestCommand(command); |
| } else { |
| rc = cliDriver.processLine(command); |
| } |
| |
| if (rc != 0 && !ignoreErrors()) { |
| break; |
| } |
| command = ""; |
| } |
| if (rc == 0 && SessionState.get() != null) { |
| SessionState.get().setLastCommand(null); // reset |
| } |
| return rc; |
| } |
| |
| /** |
| * This allows a .q file to continue executing after a statement runs into an error which is convenient |
| * if you want to use another hive cmd after the failure to sanity check the state of the system. |
| */ |
| private boolean ignoreErrors() { |
| return conf.getBoolVar(HiveConf.ConfVars.CLIIGNOREERRORS); |
| } |
| |
| private boolean isHiveCommand(String command) { |
| String[] cmd = command.trim().split("\\s+"); |
| if (HiveCommand.find(cmd) != null) { |
| return true; |
| } else if (HiveCommand.find(cmd, HiveCommand.ONLY_FOR_TESTING) != null) { |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| private int executeTestCommand(final String command) { |
| String commandName = command.trim().split("\\s+")[0]; |
| String commandArgs = command.trim().substring(commandName.length()); |
| |
| if (commandArgs.endsWith(";")) { |
| commandArgs = StringUtils.chop(commandArgs); |
| } |
| |
| //replace ${hiveconf:hive.metastore.warehouse.dir} with actual dir if existed. |
| //we only want the absolute path, so remove the header, such as hdfs://localhost:57145 |
| String wareHouseDir = SessionState.get().getConf().getVar(ConfVars.METASTOREWAREHOUSE) |
| .replaceAll("^[a-zA-Z]+://.*?:\\d+", ""); |
| commandArgs = commandArgs.replaceAll("\\$\\{hiveconf:hive\\.metastore\\.warehouse\\.dir\\}", |
| wareHouseDir); |
| |
| if (SessionState.get() != null) { |
| SessionState.get().setLastCommand(commandName + " " + commandArgs.trim()); |
| } |
| |
| enableTestOnlyCmd(SessionState.get().getConf()); |
| |
| try { |
| CommandProcessor proc = getTestCommand(commandName); |
| if (proc != null) { |
| CommandProcessorResponse response = proc.run(commandArgs.trim()); |
| |
| int rc = response.getResponseCode(); |
| if (rc != 0) { |
| SessionState.getConsole().printError(response.toString(), response.getException() != null ? |
| Throwables.getStackTraceAsString(response.getException()) : ""); |
| } |
| |
| return rc; |
| } else { |
| throw new RuntimeException("Could not get CommandProcessor for command: " + commandName); |
| } |
| } catch (Exception e) { |
| throw new RuntimeException("Could not execute test command", e); |
| } |
| } |
| |
| private CommandProcessor getTestCommand(final String commandName) throws SQLException { |
| HiveCommand testCommand = HiveCommand.find(new String[]{commandName}, HiveCommand.ONLY_FOR_TESTING); |
| |
| if (testCommand == null) { |
| return null; |
| } |
| |
| return CommandProcessorFactory |
| .getForHiveCommandInternal(new String[]{commandName}, SessionState.get().getConf(), |
| testCommand.isOnlyForTesting()); |
| } |
| |
| private void enableTestOnlyCmd(HiveConf conf){ |
| StringBuilder securityCMDs = new StringBuilder(conf.getVar(HiveConf.ConfVars.HIVE_SECURITY_COMMAND_WHITELIST)); |
| for(String c : testOnlyCommands){ |
| securityCMDs.append(","); |
| securityCMDs.append(c); |
| } |
| conf.set(HiveConf.ConfVars.HIVE_SECURITY_COMMAND_WHITELIST.toString(), securityCMDs.toString()); |
| } |
| |
| private boolean isCommandUsedForTesting(final String command) { |
| String commandName = command.trim().split("\\s+")[0]; |
| HiveCommand testCommand = HiveCommand.find(new String[]{commandName}, HiveCommand.ONLY_FOR_TESTING); |
| return testCommand != null; |
| } |
| |
| private String getCommand(String tname) { |
| String commands = qMap.get(tname); |
| StringBuilder newCommands = new StringBuilder(commands.length()); |
| int lastMatchEnd = 0; |
| Matcher commentMatcher = Pattern.compile("^--.*$", Pattern.MULTILINE).matcher(commands); |
| // remove the comments |
| while (commentMatcher.find()) { |
| newCommands.append(commands.substring(lastMatchEnd, commentMatcher.start())); |
| lastMatchEnd = commentMatcher.end(); |
| } |
| newCommands.append(commands.substring(lastMatchEnd, commands.length())); |
| commands = newCommands.toString(); |
| return commands; |
| } |
| |
| public boolean shouldBeSkipped(String tname) { |
| return qSkipSet.contains(tname); |
| } |
| |
| private String getOutFileExtension(String fname) { |
| String outFileExtension = ".out"; |
| if (qJavaVersionSpecificOutput.contains(fname)) { |
| outFileExtension = ".java" + javaVersion + ".out"; |
| } |
| |
| return outFileExtension; |
| } |
| |
| public void convertSequenceFileToTextFile() throws Exception { |
| // Create an instance of hive in order to create the tables |
| testWarehouse = conf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE); |
| db = Hive.get(conf); |
| |
| // Move all data from dest4_sequencefile to dest4 |
| HiveCompatUtil.getDriverResponseCode(drv, "FROM dest4_sequencefile INSERT OVERWRITE TABLE dest4 SELECT dest4_sequencefile.*"); |
| |
| |
| // Drop dest4_sequencefile |
| db.dropTable(DEFAULT_DATABASE_NAME, "dest4_sequencefile", true, true); |
| } |
| |
| public QTestProcessExecResult checkNegativeResults(String tname, Exception e) throws Exception { |
| |
| String outFileExtension = getOutFileExtension(tname); |
| |
| File qf = new File(outDir, tname); |
| String expf = outPath(outDir.toString(), tname.concat(outFileExtension)); |
| |
| File outf = null; |
| outf = new File(logDir); |
| outf = new File(outf, qf.getName().concat(outFileExtension)); |
| |
| FileWriter outfd = new FileWriter(outf); |
| if (e instanceof ParseException) { |
| outfd.write("Parse Error: "); |
| } else if (e instanceof SemanticException) { |
| outfd.write("Semantic Exception: \n"); |
| } else { |
| throw e; |
| } |
| |
| outfd.write(e.getMessage()); |
| outfd.close(); |
| |
| QTestProcessExecResult result = executeDiffCommand(outf.getPath(), expf, false, |
| qSortSet.contains(qf.getName())); |
| if (overWrite) { |
| overwriteResults(outf.getPath(), expf); |
| return QTestProcessExecResult.createWithoutOutput(0); |
| } |
| |
| return result; |
| } |
| |
| public QTestProcessExecResult checkParseResults(String tname, ASTNode tree) throws Exception { |
| |
| if (tree != null) { |
| String outFileExtension = getOutFileExtension(tname); |
| |
| File parseDir = new File(outDir, "parse"); |
| String expf = outPath(parseDir.toString(), tname.concat(outFileExtension)); |
| |
| File outf = null; |
| outf = new File(logDir); |
| outf = new File(outf, tname.concat(outFileExtension)); |
| |
| FileWriter outfd = new FileWriter(outf); |
| outfd.write(tree.toStringTree()); |
| outfd.close(); |
| |
| QTestProcessExecResult exitVal = executeDiffCommand(outf.getPath(), expf, false, false); |
| |
| if (overWrite) { |
| overwriteResults(outf.getPath(), expf); |
| return QTestProcessExecResult.createWithoutOutput(0); |
| } |
| |
| return exitVal; |
| } else { |
| throw new Exception("Parse tree is null"); |
| } |
| } |
| |
| /** |
| * Given the current configurations (e.g., hadoop version and execution mode), return |
| * the correct file name to compare with the current test run output. |
| * @param outDir The directory where the reference log files are stored. |
| * @param testName The test file name (terminated by ".out"). |
| * @return The file name appended with the configuration values if it exists. |
| */ |
| public String outPath(String outDir, String testName) { |
| String ret = (new File(outDir, testName)).getPath(); |
| // List of configurations. Currently the list consists of hadoop version and execution mode only |
| List<String> configs = new ArrayList<String>(); |
| configs.add(this.clusterType.toString()); |
| configs.add(this.hadoopVer); |
| |
| Deque<String> stack = new LinkedList<String>(); |
| StringBuilder sb = new StringBuilder(); |
| sb.append(testName); |
| stack.push(sb.toString()); |
| |
| // example file names are input1.q.out_mr_0.17 or input2.q.out_0.17 |
| for (String s: configs) { |
| sb.append('_'); |
| sb.append(s); |
| stack.push(sb.toString()); |
| } |
| while (stack.size() > 0) { |
| String fileName = stack.pop(); |
| File f = new File(outDir, fileName); |
| if (f.exists()) { |
| ret = f.getPath(); |
| break; |
| } |
| } |
| return ret; |
| } |
| |
| private Pattern[] toPattern(String[] patternStrs) { |
| Pattern[] patterns = new Pattern[patternStrs.length]; |
| for (int i = 0; i < patternStrs.length; i++) { |
| patterns[i] = Pattern.compile(patternStrs[i]); |
| } |
| return patterns; |
| } |
| |
| private void maskPatterns(Pattern[] patterns, String fname) throws Exception { |
| String maskPattern = "#### A masked pattern was here ####"; |
| String partialMaskPattern = "#### A PARTIAL masked pattern was here ####"; |
| |
| String line; |
| BufferedReader in; |
| BufferedWriter out; |
| |
| File file = new File(fname); |
| File fileOrig = new File(fname + ".orig"); |
| FileUtils.copyFile(file, fileOrig); |
| |
| in = new BufferedReader(new InputStreamReader(new FileInputStream(fileOrig), "UTF-8")); |
| out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), "UTF-8")); |
| |
| boolean lastWasMasked = false; |
| boolean partialMaskWasMatched = false; |
| Matcher matcher; |
| while (null != (line = in.readLine())) { |
| if (fsType == FsType.encrypted_hdfs) { |
| for (Pattern pattern : partialReservedPlanMask) { |
| matcher = pattern.matcher(line); |
| if (matcher.find()) { |
| line = partialMaskPattern + " " + matcher.group(0); |
| partialMaskWasMatched = true; |
| break; |
| } |
| } |
| } |
| else { |
| for (PatternReplacementPair prp : partialPlanMask) { |
| matcher = prp.pattern.matcher(line); |
| if (matcher.find()) { |
| line = line.replaceAll(prp.pattern.pattern(), prp.replacement); |
| partialMaskWasMatched = true; |
| } |
| } |
| } |
| |
| if (!partialMaskWasMatched) { |
| for (Pair<Pattern, String> pair : patternsWithMaskComments) { |
| Pattern pattern = pair.getLeft(); |
| String maskComment = pair.getRight(); |
| |
| matcher = pattern.matcher(line); |
| if (matcher.find()) { |
| line = matcher.replaceAll(maskComment); |
| partialMaskWasMatched = true; |
| break; |
| } |
| } |
| |
| for (Pattern pattern : patterns) { |
| line = pattern.matcher(line).replaceAll(maskPattern); |
| } |
| } |
| |
| if (line.equals(maskPattern)) { |
| // We're folding multiple masked lines into one. |
| if (!lastWasMasked) { |
| out.write(line); |
| out.write("\n"); |
| lastWasMasked = true; |
| partialMaskWasMatched = false; |
| } |
| } else { |
| out.write(line); |
| out.write("\n"); |
| lastWasMasked = false; |
| partialMaskWasMatched = false; |
| } |
| } |
| |
| in.close(); |
| out.close(); |
| } |
| |
| private final Pattern[] planMask = toPattern(new String[] { |
| ".*file:.*", |
| ".*pfile:.*", |
| ".*/tmp/.*", |
| ".*invalidscheme:.*", |
| ".*lastUpdateTime.*", |
| ".*lastAccessTime.*", |
| ".*lastModifiedTime.*", |
| ".*[Oo]wner.*", |
| ".*CreateTime.*", |
| ".*LastAccessTime.*", |
| ".*Location.*", |
| ".*LOCATION '.*", |
| ".*transient_lastDdlTime.*", |
| ".*last_modified_.*", |
| ".*at org.*", |
| ".*at sun.*", |
| ".*at java.*", |
| ".*at junit.*", |
| ".*Caused by:.*", |
| ".*LOCK_QUERYID:.*", |
| ".*LOCK_TIME:.*", |
| ".*grantTime.*", |
| ".*[.][.][.] [0-9]* more.*", |
| ".*job_[0-9_]*.*", |
| ".*job_local[0-9_]*.*", |
| ".*USING 'java -cp.*", |
| "^Deleted.*", |
| ".*DagName:.*", |
| ".*DagId:.*", |
| ".*Input:.*/data/files/.*", |
| ".*Output:.*/data/files/.*", |
| ".*total number of created files now is.*", |
| ".*.hive-staging.*", |
| ".*Warning.*", |
| "pk_-?[0-9]*_[0-9]*_[0-9]*", |
| "fk_-?[0-9]*_[0-9]*_[0-9]*", |
| "uk_-?[0-9]*_[0-9]*_[0-9]*", |
| "nn_-?[0-9]*_[0-9]*_[0-9]*", |
| ".*at com\\.sun\\.proxy.*", |
| ".*at com\\.jolbox.*", |
| ".*at com\\.zaxxer.*", |
| "org\\.apache\\.hadoop\\.hive\\.metastore\\.model\\.MConstraint@([0-9]|[a-z])*", |
| "^Repair: Added partition to metastore.*" |
| }); |
| |
| private final Pattern[] partialReservedPlanMask = toPattern(new String[] { |
| "data/warehouse/(.*?/)+\\.hive-staging" // the directory might be db/table/partition |
| //TODO: add more expected test result here |
| }); |
| /** |
| * Pattern to match and (partial) replacement text. |
| * For example, {"transaction":76,"bucketid":8249877}. We just want to mask 76 but a regex that |
| * matches just 76 will match a lot of other things. |
| */ |
| private final static class PatternReplacementPair { |
| private final Pattern pattern; |
| private final String replacement; |
| PatternReplacementPair(Pattern p, String r) { |
| pattern = p; |
| replacement = r; |
| } |
| } |
| private final PatternReplacementPair[] partialPlanMask; |
| { |
| ArrayList<PatternReplacementPair> ppm = new ArrayList<>(); |
| ppm.add(new PatternReplacementPair(Pattern.compile("\\{\"transactionid\":[1-9][0-9]*,\"bucketid\":"), |
| "{\"transactionid\":### Masked txnid ###,\"bucketid\":")); |
| |
| ppm.add(new PatternReplacementPair(Pattern.compile("attempt_[0-9]+"), "attempt_#ID#")); |
| ppm.add(new PatternReplacementPair(Pattern.compile("vertex_[0-9_]+"), "vertex_#ID#")); |
| ppm.add(new PatternReplacementPair(Pattern.compile("task_[0-9_]+"), "task_#ID#")); |
| partialPlanMask = ppm.toArray(new PatternReplacementPair[ppm.size()]); |
| } |
| /* This list may be modified by specific cli drivers to mask strings that change on every test */ |
| private final List<Pair<Pattern, String>> patternsWithMaskComments = |
| new ArrayList<Pair<Pattern, String>>() { |
| { |
| add(toPatternPair("(pblob|s3.?|swift|wasb.?).*hive-staging.*", |
| "### BLOBSTORE_STAGING_PATH ###")); |
| add(toPatternPair(PATH_HDFS_WITH_DATE_USER_GROUP_REGEX, |
| "### USER ### ### GROUP ###$3$4 ### HDFS DATE ### $6### HDFS PATH ###")); |
| add(toPatternPair(PATH_HDFS_REGEX, "$1### HDFS PATH ###")); |
| } |
| }; |
| |
| private Pair<Pattern, String> toPatternPair(String patternStr, String maskComment) { |
| return ImmutablePair.of(Pattern.compile(patternStr), maskComment); |
| } |
| |
| public void addPatternWithMaskComment(String patternStr, String maskComment) { |
| patternsWithMaskComments.add(toPatternPair(patternStr, maskComment)); |
| } |
| |
| public QTestProcessExecResult checkCliDriverResults(String tname) throws Exception { |
| assert(qMap.containsKey(tname)); |
| |
| String outFileExtension = getOutFileExtension(tname); |
| String outFileName = outPath(outDir, tname + outFileExtension); |
| |
| File f = new File(logDir, tname + outFileExtension); |
| |
| maskPatterns(planMask, f.getPath()); |
| QTestProcessExecResult exitVal = executeDiffCommand(f.getPath(), |
| outFileName, false, |
| qSortSet.contains(tname)); |
| |
| if (overWrite) { |
| overwriteResults(f.getPath(), outFileName); |
| return QTestProcessExecResult.createWithoutOutput(0); |
| } |
| |
| return exitVal; |
| } |
| |
| |
| public QTestProcessExecResult checkCompareCliDriverResults(String tname, List<String> outputs) |
| throws Exception { |
| assert outputs.size() > 1; |
| maskPatterns(planMask, outputs.get(0)); |
| for (int i = 1; i < outputs.size(); ++i) { |
| maskPatterns(planMask, outputs.get(i)); |
| QTestProcessExecResult result = executeDiffCommand( |
| outputs.get(i - 1), outputs.get(i), false, qSortSet.contains(tname)); |
| if (result.getReturnCode() != 0) { |
| System.out.println("Files don't match: " + outputs.get(i - 1) + " and " + outputs.get(i)); |
| return result; |
| } |
| } |
| return QTestProcessExecResult.createWithoutOutput(0); |
| } |
| |
| private static void overwriteResults(String inFileName, String outFileName) throws Exception { |
| // This method can be replaced with Files.copy(source, target, REPLACE_EXISTING) |
| // once Hive uses JAVA 7. |
| System.out.println("Overwriting results " + inFileName + " to " + outFileName); |
| int result = executeCmd(new String[]{ |
| "cp", |
| getQuotedString(inFileName), |
| getQuotedString(outFileName) |
| }).getReturnCode(); |
| if (result != 0) { |
| throw new IllegalStateException("Unexpected error while overwriting " + |
| inFileName + " with " + outFileName); |
| } |
| } |
| |
| private static QTestProcessExecResult executeDiffCommand(String inFileName, |
| String outFileName, |
| boolean ignoreWhiteSpace, |
| boolean sortResults |
| ) throws Exception { |
| |
| QTestProcessExecResult result; |
| |
| if (sortResults) { |
| // sort will try to open the output file in write mode on windows. We need to |
| // close it first. |
| SessionState ss = SessionState.get(); |
| if (ss != null && ss.out != null && ss.out != System.out) { |
| ss.out.close(); |
| } |
| |
| String inSorted = inFileName + SORT_SUFFIX; |
| String outSorted = outFileName + SORT_SUFFIX; |
| |
| sortFiles(inFileName, inSorted); |
| sortFiles(outFileName, outSorted); |
| |
| inFileName = inSorted; |
| outFileName = outSorted; |
| } |
| |
| ArrayList<String> diffCommandArgs = new ArrayList<String>(); |
| diffCommandArgs.add("diff"); |
| |
| // Text file comparison |
| diffCommandArgs.add("-a"); |
| |
| // Ignore changes in the amount of white space |
| if (ignoreWhiteSpace) { |
| diffCommandArgs.add("-b"); |
| } |
| |
| // Add files to compare to the arguments list |
| diffCommandArgs.add(getQuotedString(inFileName)); |
| diffCommandArgs.add(getQuotedString(outFileName)); |
| |
| result = executeCmd(diffCommandArgs); |
| |
| if (sortResults) { |
| new File(inFileName).delete(); |
| new File(outFileName).delete(); |
| } |
| |
| return result; |
| } |
| |
| private static void sortFiles(String in, String out) throws Exception { |
| int result = executeCmd(new String[]{ |
| "sort", |
| getQuotedString(in), |
| }, out, null).getReturnCode(); |
| if (result != 0) { |
| throw new IllegalStateException("Unexpected error while sorting " + in); |
| } |
| } |
| |
| private static QTestProcessExecResult executeCmd(Collection<String> args) throws Exception { |
| return executeCmd(args, null, null); |
| } |
| |
| private static QTestProcessExecResult executeCmd(String[] args) throws Exception { |
| return executeCmd(args, null, null); |
| } |
| |
| private static QTestProcessExecResult executeCmd(Collection<String> args, String outFile, |
| String errFile) throws Exception { |
| String[] cmdArray = args.toArray(new String[args.size()]); |
| return executeCmd(cmdArray, outFile, errFile); |
| } |
| |
| private static QTestProcessExecResult executeCmd(String[] args, String outFile, |
| String errFile) throws Exception { |
| System.out.println("Running: " + org.apache.commons.lang3.StringUtils.join(args, ' ')); |
| |
| PrintStream out = outFile == null ? |
| SessionState.getConsole().getChildOutStream() : |
| new PrintStream(new FileOutputStream(outFile), true, "UTF-8"); |
| PrintStream err = errFile == null ? |
| SessionState.getConsole().getChildErrStream() : |
| new PrintStream(new FileOutputStream(errFile), true, "UTF-8"); |
| |
| Process executor = Runtime.getRuntime().exec(args); |
| |
| ByteArrayOutputStream bos = new ByteArrayOutputStream(); |
| PrintStream str = new PrintStream(bos, true, "UTF-8"); |
| |
| StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, err); |
| StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, out, str); |
| |
| outPrinter.start(); |
| errPrinter.start(); |
| |
| int result = executor.waitFor(); |
| |
| outPrinter.join(); |
| errPrinter.join(); |
| |
| if (outFile != null) { |
| out.close(); |
| } |
| |
| if (errFile != null) { |
| err.close(); |
| } |
| |
| return QTestProcessExecResult. |
| create(result, new String(bos.toByteArray(), StandardCharsets.UTF_8)); |
| } |
| |
| private static String getQuotedString(String str){ |
| return str; |
| } |
| |
| public ASTNode parseQuery(String tname) throws Exception { |
| return pd.parse(qMap.get(tname)); |
| } |
| |
| public void resetParser() throws SemanticException { |
| pd = new ParseDriver(); |
| queryState = HiveCompatUtil.getQueryState(conf); |
| sem = new SemanticAnalyzer(queryState); |
| } |
| |
| |
| public List<Task<? extends Serializable>> analyzeAST(ASTNode ast) throws Exception { |
| |
| // Do semantic analysis and plan generation |
| Context ctx = new Context(conf); |
| while ((ast.getToken() == null) && (ast.getChildCount() > 0)) { |
| ast = (ASTNode) ast.getChild(0); |
| } |
| sem.getOutputs().clear(); |
| sem.getInputs().clear(); |
| sem.analyze(ast, ctx); |
| ctx.clear(); |
| return sem.getRootTasks(); |
| } |
| |
| public TreeMap<String, String> getQMap() { |
| return qMap; |
| } |
| |
| /** |
| * QTestSetup defines test fixtures which are reused across testcases, |
| * and are needed before any test can be run |
| */ |
| public static class QTestSetup |
| { |
| private MiniZooKeeperCluster zooKeeperCluster = null; |
| private int zkPort; |
| private ZooKeeper zooKeeper; |
| |
| public QTestSetup() { |
| } |
| |
| public void preTest(HiveConf conf) throws Exception { |
| |
| if (zooKeeperCluster == null) { |
| //create temp dir |
| String tmpBaseDir = System.getProperty(TEST_TMP_DIR_PROPERTY); |
| File tmpDir = Utilities.createTempDir(tmpBaseDir); |
| |
| zooKeeperCluster = new MiniZooKeeperCluster(); |
| zkPort = zooKeeperCluster.startup(tmpDir); |
| } |
| |
| if (zooKeeper != null) { |
| zooKeeper.close(); |
| } |
| |
| int sessionTimeout = (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); |
| zooKeeper = new ZooKeeper("localhost:" + zkPort, sessionTimeout, new Watcher() { |
| @Override |
| public void process(WatchedEvent arg0) { |
| } |
| }); |
| |
| String zkServer = "localhost"; |
| conf.set("hive.zookeeper.quorum", zkServer); |
| conf.set("hive.zookeeper.client.port", "" + zkPort); |
| } |
| |
| public void postTest(HiveConf conf) throws Exception { |
| if (zooKeeperCluster == null) { |
| return; |
| } |
| |
| if (zooKeeper != null) { |
| zooKeeper.close(); |
| } |
| |
| ZooKeeperHiveLockManager.releaseAllLocks(conf); |
| } |
| |
| public void tearDown() throws Exception { |
| CuratorFrameworkSingleton.closeAndReleaseInstance(); |
| |
| if (zooKeeperCluster != null) { |
| zooKeeperCluster.shutdown(); |
| zooKeeperCluster = null; |
| } |
| } |
| } |
| |
| /** |
| * QTRunner: Runnable class for running a a single query file. |
| * |
| **/ |
| public static class QTRunner implements Runnable { |
| private final QTestUtil qt; |
| private final String fname; |
| |
| public QTRunner(QTestUtil qt, String fname) { |
| this.qt = qt; |
| this.fname = fname; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| // assumption is that environment has already been cleaned once globally |
| // hence each thread does not call cleanUp() and createSources() again |
| qt.cliInit(fname, false); |
| qt.executeClient(fname); |
| } catch (Throwable e) { |
| System.err.println("Query file " + fname + " failed with exception " |
| + e.getMessage()); |
| e.printStackTrace(); |
| outputTestFailureHelpMessage(); |
| } |
| } |
| } |
| |
| /** |
| * Setup to execute a set of query files. Uses QTestUtil to do so. |
| * |
| * @param qfiles |
| * array of input query files containing arbitrary number of hive |
| * queries |
| * @param resDir |
| * output directory |
| * @param logDir |
| * log directory |
| * @return one QTestUtil for each query file |
| */ |
| public static QTestUtil[] queryListRunnerSetup(File[] qfiles, String resDir, |
| String logDir, String initScript, |
| String cleanupScript) throws Exception |
| { |
| QTestUtil[] qt = new QTestUtil[qfiles.length]; |
| for (int i = 0; i < qfiles.length; i++) { |
| qt[i] = new QTestUtil(resDir, logDir, MiniClusterType.none, null, "0.20", |
| initScript == null ? defaultInitScript : initScript, |
| cleanupScript == null ? defaultCleanupScript : cleanupScript, false); |
| qt[i].addFile(qfiles[i]); |
| qt[i].clearTestSideEffects(); |
| } |
| |
| return qt; |
| } |
| |
| /** |
| * Executes a set of query files in sequence. |
| * |
| * @param qfiles |
| * array of input query files containing arbitrary number of hive |
| * queries |
| * @param qt |
| * array of QTestUtils, one per qfile |
| * @return true if all queries passed, false otw |
| */ |
| public static boolean queryListRunnerSingleThreaded(File[] qfiles, QTestUtil[] qt) |
| throws Exception |
| { |
| boolean failed = false; |
| qt[0].cleanUp(); |
| qt[0].createSources(); |
| for (int i = 0; i < qfiles.length && !failed; i++) { |
| qt[i].clearTestSideEffects(); |
| qt[i].cliInit(qfiles[i].getName(), false); |
| qt[i].executeClient(qfiles[i].getName()); |
| QTestProcessExecResult result = qt[i].checkCliDriverResults(qfiles[i].getName()); |
| if (result.getReturnCode() != 0) { |
| failed = true; |
| StringBuilder builder = new StringBuilder(); |
| builder.append("Test ") |
| .append(qfiles[i].getName()) |
| .append(" results check failed with error code ") |
| .append(result.getReturnCode()); |
| if (Strings.isNotEmpty(result.getCapturedOutput())) { |
| builder.append(" and diff value ").append(result.getCapturedOutput()); |
| } |
| System.err.println(builder.toString()); |
| outputTestFailureHelpMessage(); |
| } |
| qt[i].clearPostTestEffects(); |
| } |
| return (!failed); |
| } |
| |
| /** |
| * Executes a set of query files parallel. |
| * |
| * Each query file is run in a separate thread. The caller has to arrange |
| * that different query files do not collide (in terms of destination tables) |
| * |
| * @param qfiles |
| * array of input query files containing arbitrary number of hive |
| * queries |
| * @param qt |
| * array of QTestUtils, one per qfile |
| * @return true if all queries passed, false otw |
| * |
| */ |
| public static boolean queryListRunnerMultiThreaded(File[] qfiles, QTestUtil[] qt) |
| throws Exception |
| { |
| boolean failed = false; |
| |
| // in multithreaded mode - do cleanup/initialization just once |
| |
| qt[0].cleanUp(); |
| qt[0].createSources(); |
| qt[0].clearTestSideEffects(); |
| |
| QTRunner[] qtRunners = new QTRunner[qfiles.length]; |
| Thread[] qtThread = new Thread[qfiles.length]; |
| |
| for (int i = 0; i < qfiles.length; i++) { |
| qtRunners[i] = new QTRunner(qt[i], qfiles[i].getName()); |
| qtThread[i] = new Thread(qtRunners[i]); |
| } |
| |
| for (int i = 0; i < qfiles.length; i++) { |
| qtThread[i].start(); |
| } |
| |
| for (int i = 0; i < qfiles.length; i++) { |
| qtThread[i].join(); |
| QTestProcessExecResult result = qt[i].checkCliDriverResults(qfiles[i].getName()); |
| if (result.getReturnCode() != 0) { |
| failed = true; |
| StringBuilder builder = new StringBuilder(); |
| builder.append("Test ") |
| .append(qfiles[i].getName()) |
| .append(" results check failed with error code ") |
| .append(result.getReturnCode()); |
| if (Strings.isNotEmpty(result.getCapturedOutput())) { |
| builder.append(" and diff value ").append(result.getCapturedOutput()); |
| } |
| System.err.println(builder.toString()); |
| outputTestFailureHelpMessage(); |
| } |
| } |
| return (!failed); |
| } |
| |
| public static void outputTestFailureHelpMessage() { |
| System.err.println( |
| "See ./ql/target/tmp/log/hive.log or ./itests/qtest/target/tmp/log/hive.log, or check " + |
| "./ql/target/surefire-reports or ./itests/qtest/target/surefire-reports/ for specific " + |
| "test cases logs."); |
| System.err.flush(); |
| } |
| |
| private static String[] cachedQvFileList = null; |
| private static ImmutableList<String> cachedDefaultQvFileList = null; |
| private static Pattern qvSuffix = Pattern.compile("_[0-9]+.qv$", Pattern.CASE_INSENSITIVE); |
| |
| public static List<String> getVersionFiles(String queryDir, String tname) { |
| ensureQvFileList(queryDir); |
| List<String> result = getVersionFilesInternal(tname); |
| if (result == null) { |
| result = cachedDefaultQvFileList; |
| } |
| return result; |
| } |
| |
| private static void ensureQvFileList(String queryDir) { |
| if (cachedQvFileList != null) { |
| return; |
| } |
| // Not thread-safe. |
| System.out.println("Getting versions from " + queryDir); |
| cachedQvFileList = (new File(queryDir)).list(new FilenameFilter() { |
| @Override |
| public boolean accept(File dir, String name) { |
| return name.toLowerCase().endsWith(".qv"); |
| } |
| }); |
| if (cachedQvFileList == null) { |
| return; // no files at all |
| } |
| Arrays.sort(cachedQvFileList, String.CASE_INSENSITIVE_ORDER); |
| List<String> defaults = getVersionFilesInternal("default"); |
| cachedDefaultQvFileList = (defaults != null) |
| ? ImmutableList.copyOf(defaults) : ImmutableList.<String>of(); |
| } |
| |
| private static List<String> getVersionFilesInternal(String tname) { |
| if (cachedQvFileList == null) { |
| return new ArrayList<String>(); |
| } |
| int pos = Arrays.binarySearch(cachedQvFileList, tname, String.CASE_INSENSITIVE_ORDER); |
| if (pos >= 0) { |
| throw new BuildException("Unexpected file list element: " + cachedQvFileList[pos]); |
| } |
| List<String> result = null; |
| for (pos = (-pos - 1); pos < cachedQvFileList.length; ++pos) { |
| String candidate = cachedQvFileList[pos]; |
| if (candidate.length() <= tname.length() |
| || !tname.equalsIgnoreCase(candidate.substring(0, tname.length())) |
| || !qvSuffix.matcher(candidate.substring(tname.length())).matches()) { |
| break; |
| } |
| if (result == null) { |
| result = new ArrayList<String>(); |
| } |
| result.add(candidate); |
| } |
| return result; |
| } |
| |
| public void failed(int ecode, String fname, String debugHint) { |
| String command = SessionState.get() != null ? SessionState.get().getLastCommand() : null; |
| String message = "Client execution failed with error code = " + ecode + |
| (command != null ? " running \"" + command : "") + "\" fname=" + fname + " " + |
| (debugHint != null ? debugHint : ""); |
| LOG.error(message); |
| Assert.fail(message); |
| } |
| |
| // for negative tests, which is succeeded.. no need to print the query string |
| public void failed(String fname, String debugHint) { |
| Assert.fail( |
| "Client Execution was expected to fail, but succeeded with error code 0 for fname=" + |
| fname + (debugHint != null ? (" " + debugHint) : "")); |
| } |
| |
| public void failedDiff(int ecode, String fname, String debugHint) { |
| String message = |
| "Client Execution succeeded but contained differences " + |
| "(error code = " + ecode + ") after executing " + |
| fname + (debugHint != null ? (" " + debugHint) : ""); |
| LOG.error(message); |
| Assert.fail(message); |
| } |
| |
| public void failed(Exception e, String fname, String debugHint) { |
| String command = SessionState.get() != null ? SessionState.get().getLastCommand() : null; |
| System.err.println("Failed query: " + fname); |
| System.err.flush(); |
| Assert.fail("Unexpected exception " + |
| org.apache.hadoop.util.StringUtils.stringifyException(e) + "\n" + |
| (command != null ? " running " + command : "") + |
| (debugHint != null ? debugHint : "")); |
| } |
| |
| public static void addTestsToSuiteFromQfileNames( |
| String qFileNamesFile, |
| Set<String> qFilesToExecute, |
| TestSuite suite, |
| Object setup, |
| SuiteAddTestFunctor suiteAddTestCallback) { |
| try { |
| File qFileNames = new File(qFileNamesFile); |
| FileReader fr = new FileReader(qFileNames.getCanonicalFile()); |
| BufferedReader br = new BufferedReader(fr); |
| String fName = null; |
| |
| while ((fName = br.readLine()) != null) { |
| if (fName.isEmpty() || fName.trim().equals("")) { |
| continue; |
| } |
| |
| int eIdx = fName.indexOf('.'); |
| |
| if (eIdx == -1) { |
| continue; |
| } |
| |
| String tName = fName.substring(0, eIdx); |
| |
| if (qFilesToExecute.isEmpty() || qFilesToExecute.contains(fName)) { |
| suiteAddTestCallback.addTestToSuite(suite, setup, tName); |
| } |
| } |
| br.close(); |
| } catch (Exception e) { |
| Assert.fail("Unexpected exception " + org.apache.hadoop.util.StringUtils.stringifyException(e)); |
| } |
| } |
| |
| public static void setupMetaStoreTableColumnStatsFor30TBTPCDSWorkload(HiveConf conf) { |
| Connection conn = null; |
| ArrayList<Statement> statements = new ArrayList<Statement>(); // list of Statements, PreparedStatements |
| |
| try { |
| Properties props = new Properties(); // connection properties |
| props.put("user", conf.get("javax.jdo.option.ConnectionUserName")); |
| props.put("password", conf.get("javax.jdo.option.ConnectionPassword")); |
| conn = DriverManager.getConnection(conf.get("javax.jdo.option.ConnectionURL"), props); |
| ResultSet rs = null; |
| Statement s = conn.createStatement(); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Connected to metastore database "); |
| } |
| |
| String mdbPath = HIVE_ROOT + "/data/files/tpcds-perf/metastore_export/"; |
| |
| // Setup the table column stats |
| BufferedReader br = new BufferedReader( |
| new FileReader( |
| new File(HIVE_ROOT + "/metastore/scripts/upgrade/derby/022-HIVE-11107.derby.sql"))); |
| String command; |
| |
| s.execute("DROP TABLE APP.TABLE_PARAMS"); |
| s.execute("DROP TABLE APP.TAB_COL_STATS"); |
| // Create the column stats table |
| while ((command = br.readLine()) != null) { |
| if (!command.endsWith(";")) { |
| continue; |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Going to run command : " + command); |
| } |
| try { |
| PreparedStatement psCommand = conn.prepareStatement(command.substring(0, command.length()-1)); |
| statements.add(psCommand); |
| psCommand.execute(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("successfully completed " + command); |
| } |
| } catch (SQLException e) { |
| LOG.info("Got SQL Exception " + e.getMessage()); |
| } |
| } |
| br.close(); |
| |
| java.nio.file.Path tabColStatsCsv = FileSystems.getDefault().getPath(mdbPath, "csv" ,"TAB_COL_STATS.txt.bz2"); |
| java.nio.file.Path tabParamsCsv = FileSystems.getDefault().getPath(mdbPath, "csv", "TABLE_PARAMS.txt.bz2"); |
| |
| // Set up the foreign key constraints properly in the TAB_COL_STATS data |
| String tmpBaseDir = System.getProperty(TEST_TMP_DIR_PROPERTY); |
| java.nio.file.Path tmpFileLoc1 = FileSystems.getDefault().getPath(tmpBaseDir, "TAB_COL_STATS.txt"); |
| java.nio.file.Path tmpFileLoc2 = FileSystems.getDefault().getPath(tmpBaseDir, "TABLE_PARAMS.txt"); |
| |
| class MyComp implements Comparator<String> { |
| @Override |
| public int compare(String str1, String str2) { |
| if (str2.length() != str1.length()) { |
| return str2.length() - str1.length(); |
| } |
| return str1.compareTo(str2); |
| } |
| } |
| |
| final SortedMap<String, Integer> tableNameToID = new TreeMap<String, Integer>(new MyComp()); |
| |
| rs = s.executeQuery("SELECT * FROM APP.TBLS"); |
| while(rs.next()) { |
| String tblName = rs.getString("TBL_NAME"); |
| Integer tblId = rs.getInt("TBL_ID"); |
| tableNameToID.put(tblName, tblId); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Resultset : " + tblName + " | " + tblId); |
| } |
| } |
| |
| final Map<String, Map<String, String>> data = new HashMap<>(); |
| rs = s.executeQuery("select TBLS.TBL_NAME, a.COLUMN_NAME, a.TYPE_NAME from " |
| + "(select COLUMN_NAME, TYPE_NAME, SDS.SD_ID from APP.COLUMNS_V2 join APP.SDS on SDS.CD_ID = COLUMNS_V2.CD_ID) a" |
| + " join APP.TBLS on TBLS.SD_ID = a.SD_ID"); |
| while (rs.next()) { |
| String tblName = rs.getString(1); |
| String colName = rs.getString(2); |
| String typeName = rs.getString(3); |
| Map<String, String> cols = data.get(tblName); |
| if (null == cols) { |
| cols = new HashMap<>(); |
| } |
| cols.put(colName, typeName); |
| data.put(tblName, cols); |
| } |
| |
| BufferedReader reader = new BufferedReader(new InputStreamReader( |
| new BZip2CompressorInputStream(Files.newInputStream(tabColStatsCsv, StandardOpenOption.READ)))); |
| |
| Stream<String> replaced = reader.lines().parallel().map(str-> { |
| String[] splits = str.split(","); |
| String tblName = splits[0]; |
| String colName = splits[1]; |
| Integer tblID = tableNameToID.get(tblName); |
| StringBuilder sb = new StringBuilder("default@"+tblName + "@" + colName + "@" + data.get(tblName).get(colName)+"@"); |
| for (int i = 2; i < splits.length; i++) { |
| sb.append(splits[i]+"@"); |
| } |
| // Add tbl_id and empty bitvector |
| return sb.append(tblID).append("@").toString(); |
| }); |
| |
| Files.write(tmpFileLoc1, (Iterable<String>)replaced::iterator); |
| replaced.close(); |
| reader.close(); |
| |
| BufferedReader reader2 = new BufferedReader(new InputStreamReader( |
| new BZip2CompressorInputStream(Files.newInputStream(tabParamsCsv, StandardOpenOption.READ)))); |
| final Map<String, String> colStats = new ConcurrentHashMap<>(); |
| Stream<String> replacedStream = reader2.lines().parallel().map(str-> { |
| String[] splits = str.split("_@"); |
| String tblName = splits[0]; |
| Integer tblId = tableNameToID.get(tblName); |
| Map<String, String> cols = data.get(tblName); |
| StringBuilder sb = new StringBuilder(); |
| sb.append("{\"COLUMN_STATS\":{"); |
| for (String colName : cols.keySet()) { |
| sb.append("\""+colName+"\":\"true\","); |
| } |
| sb.append("},\"BASIC_STATS\":\"true\"}"); |
| colStats.put(tblId.toString(), sb.toString()); |
| |
| return tblId.toString() + "@" + splits[1]; |
| }); |
| |
| Files.write(tmpFileLoc2, (Iterable<String>)replacedStream::iterator); |
| Files.write(tmpFileLoc2, (Iterable<String>)colStats.entrySet().stream() |
| .map(map->map.getKey()+"@COLUMN_STATS_ACCURATE@"+map.getValue())::iterator, StandardOpenOption.APPEND); |
| |
| replacedStream.close(); |
| reader2.close(); |
| // Load the column stats and table params with 30 TB scale |
| String importStatement1 = "CALL SYSCS_UTIL.SYSCS_IMPORT_TABLE(null, '" + "TAB_COL_STATS" + |
| "', '" + tmpFileLoc1.toAbsolutePath().toString() + |
| "', '@', null, 'UTF-8', 1)"; |
| String importStatement2 = "CALL SYSCS_UTIL.SYSCS_IMPORT_TABLE(null, '" + "TABLE_PARAMS" + |
| "', '" + tmpFileLoc2.toAbsolutePath().toString() + |
| "', '@', null, 'UTF-8', 1)"; |
| try { |
| PreparedStatement psImport1 = conn.prepareStatement(importStatement1); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Going to execute : " + importStatement1); |
| } |
| statements.add(psImport1); |
| psImport1.execute(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("successfully completed " + importStatement1); |
| } |
| PreparedStatement psImport2 = conn.prepareStatement(importStatement2); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Going to execute : " + importStatement2); |
| } |
| statements.add(psImport2); |
| psImport2.execute(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("successfully completed " + importStatement2); |
| } |
| } catch (SQLException e) { |
| LOG.info("Got SQL Exception " + e.getMessage()); |
| } |
| } catch (FileNotFoundException e1) { |
| LOG.info("Got File not found Exception " + e1.getMessage()); |
| } catch (IOException e1) { |
| LOG.info("Got IOException " + e1.getMessage()); |
| } catch (SQLException e1) { |
| LOG.info("Got SQLException " + e1.getMessage()); |
| } finally { |
| // Statements and PreparedStatements |
| int i = 0; |
| while (!statements.isEmpty()) { |
| // PreparedStatement extend Statement |
| Statement st = statements.remove(i); |
| try { |
| if (st != null) { |
| st.close(); |
| st = null; |
| } |
| } catch (SQLException sqle) { |
| } |
| } |
| |
| //Connection |
| try { |
| if (conn != null) { |
| conn.close(); |
| conn = null; |
| } |
| } catch (SQLException sqle) { |
| } |
| } |
| } |
| |
| private static String getHiveRoot() { |
| String path; |
| if (System.getProperty("hive.root") != null) { |
| try { |
| path = new File(System.getProperty("hive.root")).getCanonicalPath(); |
| } catch (IOException e) { |
| throw new RuntimeException("error getting hive.root", e); |
| } |
| } else { |
| path = new File("target").getAbsolutePath(); |
| } |
| return ensurePathEndsInSlash(new File(path).getAbsolutePath()); |
| } |
| |
| public static String ensurePathEndsInSlash(String path) { |
| if (path == null) { |
| throw new NullPointerException("Path cannot be null"); |
| } |
| if (path.endsWith(File.separator)) { |
| return path; |
| } else { |
| return path + File.separator; |
| } |
| } |
| |
| } |