blob: ba50a2bd6e29bff42ab4d79c5ce7a933c19f8a32 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.io.orc.TestVectorizedOrcAcidRowBatchReader;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.BitSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2.swapTxnManager;
public class TestTxnCommands3 extends TxnCommandsBaseForTests {
static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands3.class);
private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
File.separator + TestTxnCommands3.class.getCanonicalName()
+ "-" + System.currentTimeMillis()
).getPath().replaceAll("\\\\", "/");
@Override
protected String getTestDataDir() {
return TEST_DATA_DIR;
}
@Test
public void testRenameTable() throws Exception {
MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true);
hiveConf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
runStatementOnDriver("drop database if exists mydb1 cascade");
runStatementOnDriver("drop database if exists mydb2 cascade");
runStatementOnDriver("create database mydb1");
runStatementOnDriver("create database mydb2");
runStatementOnDriver("create table mydb1.T(a int, b int) stored as orc");
runStatementOnDriver("insert into mydb1.T values(1,2),(4,5)");
//put something in WRITE_SET
runStatementOnDriver("update mydb1.T set b = 6 where b = 5");
runStatementOnDriver("alter table mydb1.T compact 'minor'");
runStatementOnDriver("alter table mydb1.T RENAME TO mydb1.S");
String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from mydb1.S";
String[][] expected = new String[][] {
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2",
"s/delta_0000001_0000001_0000/bucket_00000_0"},
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t4\t6",
"s/delta_0000002_0000002_0000/bucket_00000"}};
checkResult(expected, testQuery, false, "check data", LOG);
Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='t'"));
Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from COMPACTION_QUEUE where CQ_TABLE='t'"));
Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from WRITE_SET where WS_TABLE='t'"));
Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='t'"));
Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from NEXT_WRITE_ID where NWI_TABLE='t'"));
Assert.assertEquals(
TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2,
TxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='s'"));
Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from COMPACTION_QUEUE where CQ_TABLE='s'"));
Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from WRITE_SET where WS_TABLE='s'"));
Assert.assertEquals(3, TxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='s'"));
Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from NEXT_WRITE_ID where NWI_TABLE='s'"));
runStatementOnDriver("alter table mydb1.S RENAME TO mydb2.bar");
Assert.assertEquals(
TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2,
TxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='bar'"));
Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from COMPACTION_QUEUE where CQ_TABLE='bar'"));
Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from WRITE_SET where WS_TABLE='bar'"));
Assert.assertEquals(4, TxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='bar'"));
Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from NEXT_WRITE_ID where NWI_TABLE='bar'"));
}
@Test
public void testDeleteEventPruningOn() throws Exception {
HiveConf.setBoolVar(hiveConf,
HiveConf.ConfVars.FILTER_DELETE_EVENTS, true);
testDeleteEventPruning();
}
@Test
public void testDeleteEventPruningOff() throws Exception {
HiveConf.setBoolVar(hiveConf,
HiveConf.ConfVars.FILTER_DELETE_EVENTS, false);
testDeleteEventPruning();
}
/**
* run with and w/o event fitlering enabled - should get the same results
* {@link TestVectorizedOrcAcidRowBatchReader#testDeleteEventFiltering()}
*
* todo: add .q test using VerifyNumReducersHook.num.reducers to make sure
* it does have 1 split for each input file.
* Will need to crate VerifyNumMappersHook
*
* Also, consider
* HiveSplitGenerator.java
* RAW_INPUT_SPLITS and GROUPED_INPUT_SPLITS are the counters before and
* after grouping splits PostExecTezSummaryPrinter post exec hook can be
* used to printout specific counters
*/
private void testDeleteEventPruning() throws Exception {
HiveConf.setBoolVar(hiveConf,
HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
dropTable(new String[] {"T"});
runStatementOnDriver(
"create transactional table T(a int, b int) stored as orc");
runStatementOnDriver("insert into T values(1,2),(4,5)");
runStatementOnDriver("insert into T values(4,6),(1,3)");
runStatementOnDriver("delete from T where a = 1");
List<String> rs = runStatementOnDriver(
"select ROW__ID, a, b from T order by a, b");
boolean isVectorized =
hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
String testQuery = isVectorized ?
"select ROW__ID, a, b from T order by a, b" :
"select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b";
String[][] expected = new String[][]{
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t4\t5",
"warehouse/t/delta_0000001_0000001_0000/bucket_00000"},
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t4\t6",
"warehouse/t/delta_0000002_0000002_0000/bucket_00000"}};
checkResult(expected, testQuery, isVectorized, "after delete", LOG);
runStatementOnDriver("alter table T compact 'MAJOR'");
runWorker(hiveConf);
TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals("Unexpected number of compactions in history",
1, resp.getCompactsSize());
Assert.assertEquals("Unexpected 0 compaction state",
TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId()
.startsWith("job_local"));
String[][] expected2 = new String[][]{
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t4\t5",
"warehouse/t/base_0000001/bucket_00000"},
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t4\t6",
"warehouse/t/base_0000002/bucket_00000"}};
checkResult(expected2, testQuery, isVectorized, "after compaction", LOG);
}
/**
* HIVE-19985
*/
@Test
public void testAcidMetaColumsDecode() throws Exception {
//this only applies in vectorized mode
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
hiveConf.set(MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID.getVarname(), "true");
runStatementOnDriver("drop table if exists T");
runStatementOnDriver("create table T (a int, b int) stored as orc");
int[][] data1 = {{1, 2}, {3, 4}};
runStatementOnDriver("insert into T" + makeValuesClause(data1));
int[][] data2 = {{5, 6}, {7, 8}};
runStatementOnDriver("insert into T" + makeValuesClause(data2));
int[][] dataAll = {{1, 2}, {3, 4}, {5, 6}, {7, 8}};
hiveConf.setBoolVar(HiveConf.ConfVars.OPTIMIZE_ACID_META_COLUMNS, true);
List<String> rs = runStatementOnDriver("select a, b from T order by a, b");
Assert.assertEquals(stringifyValues(dataAll), rs);
hiveConf.setBoolVar(HiveConf.ConfVars.OPTIMIZE_ACID_META_COLUMNS, false);
rs = runStatementOnDriver("select a, b from T order by a, b");
Assert.assertEquals(stringifyValues(dataAll), rs);
runStatementOnDriver("alter table T compact 'major'");
TestTxnCommands2.runWorker(hiveConf);
//check status of compaction job
TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals("Unexpected number of compactions in history",
1, resp.getCompactsSize());
Assert.assertEquals("Unexpected 0 compaction state",
TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
Assert.assertTrue(
resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local"));
hiveConf.setBoolVar(HiveConf.ConfVars.OPTIMIZE_ACID_META_COLUMNS, true);
rs = runStatementOnDriver("select a, b from T order by a, b");
Assert.assertEquals(stringifyValues(dataAll), rs);
hiveConf.setBoolVar(HiveConf.ConfVars.OPTIMIZE_ACID_META_COLUMNS, false);
rs = runStatementOnDriver("select a, b from T order by a, b");
Assert.assertEquals(stringifyValues(dataAll), rs);
}
/**
* Test that rows are routed to proper files based on bucket col/ROW__ID
* Only the Vectorized Acid Reader checks if bucketId in ROW__ID inside the file
* matches the file name and only for files in delete_delta
*/
@Test
public void testSdpoBucketed() throws Exception {
testSdpoBucketed(true, true, 1);
testSdpoBucketed(true, false, 1);
testSdpoBucketed(false, true, 1);
testSdpoBucketed(false, false,1);
testSdpoBucketed(true, true, 2);
testSdpoBucketed(true, false, 2);
testSdpoBucketed(false, true, 2);
testSdpoBucketed(false, false,2);
}
private void testSdpoBucketed(boolean isVectorized, boolean isSdpo, int bucketing_version)
throws Exception {
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized);
hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTSORTDYNAMICPARTITION, isSdpo);
runStatementOnDriver("drop table if exists acid_uap");
runStatementOnDriver("create transactional table acid_uap(a int, b varchar(128)) " +
"partitioned by (ds string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES " +
"('bucketing_version'='" + bucketing_version + "')");
runStatementOnDriver("insert into table acid_uap partition (ds='tomorrow') " +
"values (1, 'bah'),(2, 'yah')");
runStatementOnDriver("insert into table acid_uap partition (ds='today') " +
"values (1, 'bah'),(2, 'yah')");
runStatementOnDriver("select a,b, ds from acid_uap order by a,b, ds");
String testQuery = isVectorized ?
"select ROW__ID, a, b, ds from acid_uap order by ds, a, b" :
"select ROW__ID, a, b, ds, INPUT__FILE__NAME from acid_uap order by ds, a, b";
String[][] expected = new String[][]{
{"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\tbah\ttoday",
"warehouse/acid_uap/ds=today/delta_0000002_0000002_0000/bucket_00001_0"},
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t2\tyah\ttoday",
"warehouse/acid_uap/ds=today/delta_0000002_0000002_0000/bucket_00000_0"},
{"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\tbah\ttomorrow",
"warehouse/acid_uap/ds=tomorrow/delta_0000001_0000001_0000/bucket_00001_0"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\tyah\ttomorrow",
"warehouse/acid_uap/ds=tomorrow/delta_0000001_0000001_0000/bucket_00000_0"}};
checkResult(expected, testQuery, isVectorized, "after insert", LOG);
runStatementOnDriver("update acid_uap set b = 'fred'");
String[][] expected2 = new String[][]{
{"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t1\tfred\ttoday",
"warehouse/acid_uap/ds=today/delta_0000003_0000003_0000/bucket_00001"},
{"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\tfred\ttoday",
"warehouse/acid_uap/ds=today/delta_0000003_0000003_0000/bucket_00000"},
{"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t1\tfred\ttomorrow",
"warehouse/acid_uap/ds=tomorrow/delta_0000003_0000003_0000/bucket_00001"},
{"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\tfred\ttomorrow",
"warehouse/acid_uap/ds=tomorrow/delta_0000003_0000003_0000/bucket_00000"}};
checkResult(expected2, testQuery, isVectorized, "after update", LOG);
}
@Test
public void testCleaner2() throws Exception {
MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true);
dropTable(new String[] {"T"});
//note: transaction names T1, T2, etc below, are logical, the actual txnid will be different
runStatementOnDriver("create table T (a int, b int) stored as orc");
runStatementOnDriver("insert into T values(0,2)");//makes delta_1_1 in T1
runStatementOnDriver("insert into T values(1,4)");//makes delta_2_2 in T2
Driver driver2 = new Driver(new QueryState.Builder().withHiveConf(hiveConf).build());
driver2.setMaxRows(10000);
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(hiveConf);
HiveTxnManager txnMgr1 = swapTxnManager(txnMgr2);
Driver driver1 = swapDrivers(driver2);
runStatementOnDriver("start transaction");//T3
/* this select sees
target/warehouse/t/
├── delta_0000001_0000001_0000
│   ├── _orc_acid_version
│   └── bucket_00000
└── delta_0000002_0000002_0000
├── _orc_acid_version
└── bucket_00000*/
String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from T";
String[][] expected = new String[][] {
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2",
"t/delta_0000001_0000001_0000/bucket_00000_0"},
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t4",
"t/delta_0000002_0000002_0000/bucket_00000_0"}};
checkResult(expected, testQuery, false, "check data", LOG);
txnMgr2 = swapTxnManager(txnMgr1);
driver2 = swapDrivers(driver1);
runStatementOnDriver("alter table T compact 'minor'");//T4
TestTxnCommands2.runWorker(hiveConf);//makes delta_1_2
/* Now we should have
target/warehouse/t/
├── delta_0000001_0000001_0000
│   ├── _orc_acid_version
│   └── bucket_00000
├── delta_0000001_0000002_v0000018
│   ├── _orc_acid_version
│   └── bucket_00000
└── delta_0000002_0000002_0000
├── _orc_acid_version
└── bucket_00000*/
FileSystem fs = FileSystem.get(hiveConf);
Path warehousePath = new Path(getWarehouseDir());
FileStatus[] actualList = fs.listStatus(new Path(warehousePath + "/t"),
FileUtils.HIDDEN_FILES_PATH_FILTER);
String[] expectedList = new String[] {
"/t/delta_0000001_0000002_v0000018",
"/t/delta_0000001_0000001_0000",
"/t/delta_0000002_0000002_0000",
};
checkExpectedFiles(actualList, expectedList, warehousePath.toString());
/*
T3 is still running and cannot see anything compactor produces with v0000019 suffix
so it may be reading delta_1_1 & delta_2_2 and so cleaner cannot delete any files
at this point*/
TestTxnCommands2.runCleaner(hiveConf);
actualList = fs.listStatus(new Path(warehousePath + "/t"),
FileUtils.HIDDEN_FILES_PATH_FILTER);
checkExpectedFiles(actualList, expectedList, warehousePath.toString());
txnMgr1 = swapTxnManager(txnMgr2);
driver1 = swapDrivers(driver2);
runStatementOnDriver("commit");//commits T3
//so now cleaner should be able to delete delta_0000002_0000002_0000
//insert a row so that compactor makes a new delta (due to HIVE-20901)
runStatementOnDriver("insert into T values(2,5)");//makes delta_3_3 in T5
runStatementOnDriver("alter table T compact 'minor'");
runWorker(hiveConf);
/*
at this point delta_0000001_0000003_v0000020 is visible to everyone
so cleaner removes all files shadowed by it (which is everything in this case)
*/
runCleaner(hiveConf);
expectedList = new String[] {
"/t/delta_0000001_0000003_v0000020"
};
actualList = fs.listStatus(new Path(warehousePath + "/t"),
FileUtils.HIDDEN_FILES_PATH_FILTER);
checkExpectedFiles(actualList, expectedList, warehousePath.toString());
}
private static void checkExpectedFiles(FileStatus[] actualList, String[] expectedList, String filePrefix) throws Exception {
Set<String> expectedSet = new HashSet<>();
Set<String> unexpectedSet = new HashSet<>();
for(String f : expectedList) {
expectedSet.add(f);
}
for(FileStatus fs : actualList) {
String endOfPath = fs.getPath().toString().substring(fs.getPath().toString().indexOf(filePrefix) + filePrefix.length());
if(!expectedSet.remove(endOfPath)) {
unexpectedSet.add(endOfPath);
}
}
Assert.assertTrue("not found set: " + expectedSet + " unexpected set: " + unexpectedSet, expectedSet.isEmpty() && unexpectedSet.isEmpty());
}
@Test
public void testCompactionAbort() throws Exception {
MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true);
dropTable(new String[] {"T"});
//note: transaction names T1, T2, etc below, are logical, the actual txnid will be different
runStatementOnDriver("create table T (a int, b int) stored as orc");
runStatementOnDriver("insert into T values(0,2)");//makes delta_1_1 in T1
runStatementOnDriver("insert into T values(1,4)");//makes delta_2_2 in T2
//create failed compaction attempt so that compactor txn is aborted
HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
runStatementOnDriver("alter table T compact 'minor'");
runWorker(hiveConf);
TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals("Unexpected number of compactions in history",
1, resp.getCompactsSize());
Assert.assertEquals("Unexpected 0th compaction state",
TxnStore.FAILED_RESPONSE, resp.getCompacts().get(0).getState());
GetOpenTxnsResponse openResp = txnHandler.getOpenTxns();
Assert.assertEquals(openResp.toString(), 1, openResp.getOpen_txnsSize());
//check that the compactor txn is aborted
Assert.assertTrue(openResp.toString(), BitSet.valueOf(openResp.getAbortedBits()).get(0));
runCleaner(hiveConf);
//we still have 1 aborted (compactor) txn
Assert.assertTrue(openResp.toString(), BitSet.valueOf(openResp.getAbortedBits()).get(0));
Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from TXN_COMPONENTS"));
//this returns 1 row since we only have 1 compaction executed
int highestCompactWriteId = TxnDbUtil.countQueryAgent(hiveConf,
"select CC_HIGHEST_WRITE_ID from COMPLETED_COMPACTIONS");
/**
* See {@link org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler#updateCompactorState(CompactionInfo, long)}
* for notes on why CC_HIGHEST_WRITE_ID=TC_WRITEID
*/
Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from TXN_COMPONENTS where TC_WRITEID=" + highestCompactWriteId));
//now make a successful compactor run so that next Cleaner run actually cleans
HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, false);
runStatementOnDriver("alter table T compact 'minor'");
runWorker(hiveConf);
resp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals("Unexpected number of compactions in history",
2, resp.getCompactsSize());
//check both combinations - don't know what order the db returns them in
Assert.assertTrue("Unexpected compaction state",
(TxnStore.FAILED_RESPONSE.equalsIgnoreCase(resp.getCompacts().get(0).getState())
&& TxnStore.CLEANING_RESPONSE.equalsIgnoreCase(resp.getCompacts().get(1).getState())) ||
(TxnStore.CLEANING_RESPONSE.equalsIgnoreCase(resp.getCompacts().get(0).getState()) &&
TxnStore.FAILED_RESPONSE.equalsIgnoreCase(resp.getCompacts().get(1).getState())));
//delete metadata about aborted txn from txn_components and files (if any)
runCleaner(hiveConf);
}
/**
* Not enough deltas to compact, no need to clean: there is absolutely nothing to do.
*/
@Test public void testNotEnoughToCompact() throws Exception {
int[][] tableData = {{1, 2}, {3, 4}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
runStatementOnDriver("alter table " + TestTxnCommands2.Table.ACIDTBL + " compact 'MAJOR'");
runWorker(hiveConf);
assertTableIsEmpty("TXN_COMPONENTS");
runCleaner(hiveConf);
assertTableIsEmpty("TXN_COMPONENTS");
}
/**
* There aren't enough deltas to compact, but cleaning is needed because an insert overwrite
* was executed.
*/
@Test public void testNotEnoughToCompactNeedsCleaning() throws Exception {
int[][] tableData = {{1, 2}, {3, 4}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
runStatementOnDriver(
"insert overwrite table " + Table.ACIDTBL + " " + makeValuesClause(tableData));
runStatementOnDriver("alter table " + TestTxnCommands2.Table.ACIDTBL + " compact 'MAJOR'");
runWorker(hiveConf);
assertTableIsEmpty("TXN_COMPONENTS");
runCleaner(hiveConf);
assertTableIsEmpty("TXN_COMPONENTS");
}
private void assertTableIsEmpty(String table) throws Exception {
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from " + table), 0,
TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from " + table));
}
private void assertOneTxn() throws Exception {
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"), 1,
TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
}
}