blob: c801d7fe56a4078bc2ba889be7222486904c2d5c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClientWithLocalCache;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorThread;
import org.apache.hadoop.hive.ql.txn.compactor.Initiator;
import org.apache.hadoop.hive.ql.txn.compactor.Worker;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class TxnCommandsBaseForTests {
private static final Logger LOG = LoggerFactory.getLogger(TxnCommandsBaseForTests.class);
//bucket count for test tables; set it to 1 for easier debugging
final static int BUCKET_COUNT = 2;
@Rule
public TestName testName = new TestName();
protected HiveConf hiveConf;
protected Driver d;
protected TxnStore txnHandler;
public enum Table {
ACIDTBL("acidTbl"),
ACIDTBLPART("acidTblPart"),
ACIDTBL2("acidTbl2"),
NONACIDORCTBL("nonAcidOrcTbl"),
NONACIDORCTBL2("nonAcidOrcTbl2"),
NONACIDNONBUCKET("nonAcidNonBucket");
final String name;
@Override
public String toString() {
return name;
}
Table(String name) {
this.name = name;
}
}
public TxnStore getTxnStore() {
return txnHandler;
}
@Before
public void setUp() throws Exception {
setUpInternal();
// set up metastore client cache
if (hiveConf.getBoolVar(HiveConf.ConfVars.MSC_CACHE_ENABLED)) {
HiveMetaStoreClientWithLocalCache.init();
}
}
void initHiveConf() {
hiveConf = new HiveConf(this.getClass());
}
void setUpInternal() throws Exception {
initHiveConf();
Path workDir = new Path(System.getProperty("test.tmp.dir",
"target" + File.separator + "test" + File.separator + "tmp"));
hiveConf.set("mapred.local.dir", workDir + File.separator + this.getClass().getSimpleName()
+ File.separator + "mapred" + File.separator + "local");
hiveConf.set("mapred.system.dir", workDir + File.separator + this.getClass().getSimpleName()
+ File.separator + "mapred" + File.separator + "system");
hiveConf.set("mapreduce.jobtracker.staging.root.dir", workDir + File.separator + this.getClass().getSimpleName()
+ File.separator + "mapred" + File.separator + "staging");
hiveConf.set("mapred.temp.dir", workDir + File.separator + this.getClass().getSimpleName()
+ File.separator + "mapred" + File.separator + "temp");
hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, getWarehouseDir());
hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
hiveConf
.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true);
HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.MERGE_SPLIT_UPDATE, true);
hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
hiveConf.setBoolean("mapred.input.dir.recursive", true);
TxnDbUtil.setConfValues(hiveConf);
txnHandler = TxnUtils.getTxnStore(hiveConf);
TxnDbUtil.prepDb(hiveConf);
File f = new File(getWarehouseDir());
if (f.exists()) {
FileUtil.fullyDelete(f);
}
if (!(new File(getWarehouseDir()).mkdirs())) {
throw new RuntimeException("Could not create " + getWarehouseDir());
}
SessionState ss = SessionState.start(hiveConf);
ss.applyAuthorizationPolicy();
d = new Driver(new QueryState.Builder().withHiveConf(hiveConf).nonIsolated().build());
d.setMaxRows(10000);
dropTables();
runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
runStatementOnDriver("create temporary table " + Table.ACIDTBL2 + "(a int, b int, c int) clustered by (c) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + "(a int, b int) stored as orc TBLPROPERTIES ('transactional'='false')");
}
protected void dropTables() throws Exception {
for(TxnCommandsBaseForTests.Table t : TxnCommandsBaseForTests.Table.values()) {
runStatementOnDriver("drop table if exists " + t);
}
}
@After
public void tearDown() throws Exception {
try {
if (d != null) {
dropTables();
d.close();
d.destroy();
d = null;
}
} finally {
TxnDbUtil.cleanDb(hiveConf);
FileUtils.deleteDirectory(new File(getTestDataDir()));
}
}
protected String getWarehouseDir() {
return getTestDataDir() + "/warehouse";
}
protected abstract String getTestDataDir();
/**
* takes raw data and turns it into a string as if from Driver.getResults()
* sorts rows in dictionary order
*/
public static List<String> stringifyValues(int[][] rowsIn) {
assert rowsIn.length > 0;
int[][] rows = rowsIn.clone();
Arrays.sort(rows, new RowComp());
List<String> rs = new ArrayList<>();
for(int[] row : rows) {
assert row.length > 0;
StringBuilder sb = new StringBuilder();
for(int value : row) {
sb.append(value).append("\t");
}
sb.setLength(sb.length() - 1);
rs.add(sb.toString());
}
return rs;
}
static class RowComp implements Comparator<int[]> {
@Override
public int compare(int[] row1, int[] row2) {
assert row1 != null && row2 != null && row1.length == row2.length;
for(int i = 0; i < row1.length; i++) {
int comp = Integer.compare(row1[i], row2[i]);
if(comp != 0) {
return comp;
}
}
return 0;
}
}
protected String makeValuesClause(int[][] rows) {
return TestTxnCommands2.makeValuesClause(rows);
}
public static void runWorker(HiveConf hiveConf) throws Exception {
runCompactorThread(hiveConf, CompactorThreadType.WORKER);
}
public static void runCleaner(HiveConf hiveConf) throws Exception {
runCompactorThread(hiveConf, CompactorThreadType.CLEANER);
}
public static void runInitiator(HiveConf hiveConf) throws Exception {
runCompactorThread(hiveConf, CompactorThreadType.INITIATOR);
}
private enum CompactorThreadType {INITIATOR, WORKER, CLEANER}
private static void runCompactorThread(HiveConf hiveConf, CompactorThreadType type)
throws Exception {
AtomicBoolean stop = new AtomicBoolean(true);
CompactorThread t;
switch (type) {
case INITIATOR:
t = new Initiator();
break;
case WORKER:
t = new Worker();
break;
case CLEANER:
t = new Cleaner();
break;
default:
throw new IllegalArgumentException("Unknown type: " + type);
}
t.setThreadId((int) t.getId());
t.setConf(hiveConf);
t.init(stop);
t.run();
}
protected List<String> runStatementOnDriver(String stmt) throws Exception {
LOG.info("Running the query: " + stmt);
try {
d.run(stmt);
} catch (CommandProcessorException e) {
throw new RuntimeException(stmt + " failed: " + e);
}
List<String> rs = new ArrayList<>();
d.getResults(rs);
return rs;
}
protected CommandProcessorException runStatementOnDriverNegative(String stmt) {
try {
d.run(stmt);
} catch (CommandProcessorException e) {
return e;
}
throw new RuntimeException("Didn't get expected failure!");
}
/**
* Runs Vectorized Explain on the query and checks if the plan is vectorized as expected
* @param vectorized {@code true} - assert that it's vectorized
*/
void assertVectorized(boolean vectorized, String query) throws Exception {
List<String> rs = runStatementOnDriver("EXPLAIN VECTORIZATION DETAIL " + query);
for(String line : rs) {
if(line != null && line.contains("Execution mode: vectorized")) {
Assert.assertTrue("Was vectorized when it wasn't expected", vectorized);
return;
}
}
Assert.assertTrue("Din't find expected 'vectorized' in plan", !vectorized);
}
/**
* Will assert that actual files match expected.
* @param expectedFiles - suffixes of expected Paths. Must be the same length
* @param rootPath - table or partition root where to start looking for actual files, recursively
*/
void assertExpectedFileSet(Set<String> expectedFiles, String rootPath, String tableName) throws Exception {
Pattern pattern = Pattern.compile("(.+)/(" + tableName + "/[delete_delta|delta|base].+)");
FileSystem fs = FileSystem.get(hiveConf);
Set<String> actualFiles = new HashSet<>();
RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(new Path(rootPath), true);
while (remoteIterator.hasNext()) {
LocatedFileStatus lfs = remoteIterator.next();
if(!lfs.isDirectory() && org.apache.hadoop.hive.common.FileUtils.HIDDEN_FILES_PATH_FILTER.accept(lfs.getPath())) {
String p = lfs.getPath().toString();
Matcher matcher = pattern.matcher(p);
if (matcher.matches()) {
actualFiles.add(matcher.group(2));
}
}
}
Assert.assertEquals("Unexpected file list", expectedFiles, actualFiles);
}
void checkExpected(List<String> rs, String[][] expected, String msg, Logger LOG, boolean checkFileName) {
LOG.warn(testName.getMethodName() + ": read data(" + msg + "): ");
logResult(LOG, rs);
Assert.assertEquals(testName.getMethodName() + ": " + msg + "; " + rs,
expected.length, rs.size());
//verify data and layout
for(int i = 0; i < expected.length; i++) {
Assert.assertTrue("Actual line (data) " + i + " data: " + rs.get(i) + "; expected " + expected[i][0], rs.get(i).startsWith(expected[i][0]));
if(checkFileName) {
Assert.assertTrue("Actual line(file) " + i + " file: " + rs.get(i),
rs.get(i).endsWith(expected[i][1]) || rs.get(i).matches(expected[i][1]));
}
}
}
void logResult(Logger LOG, List<String> rs) {
StringBuilder sb = new StringBuilder();
for(String s : rs) {
sb.append(s).append('\n');
}
LOG.info(sb.toString());
}
/**
* We have to use a different query to check results for Vectorized tests because to get the
* file name info we need to use {@link org.apache.hadoop.hive.ql.metadata.VirtualColumn#FILENAME}
* which will currently make the query non-vectorizable. This means we can't check the file name
* for vectorized version of the test.
*/
protected void checkResult(String[][] expectedResult, String query, boolean isVectorized, String msg, Logger LOG) throws Exception{
List<String> rs = runStatementOnDriver(query);
checkExpected(rs, expectedResult, msg + (isVectorized ? " vect" : ""), LOG, !isVectorized);
assertVectorized(isVectorized, query);
}
void dropTable(String[] tabs) throws Exception {
for(String tab : tabs) {
d.run("drop table if exists " + tab);
}
}
Driver swapDrivers(Driver otherDriver) {
Driver tmp = d;
d = otherDriver;
return tmp;
}
}