blob: fcaea796970e497bd073a53f6a13e02e32f57ff8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql;
import java.io.File;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.metastore.txn.AcidOpenTxnsCounterService;
import org.apache.hadoop.hive.ql.txn.compactor.Initiator;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestTxnCommands2 {
static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands2.class);
protected static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
File.separator + TestTxnCommands2.class.getCanonicalName()
+ "-" + System.currentTimeMillis()
).getPath().replaceAll("\\\\", "/");
protected static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
//bucket count for test tables; set it to 1 for easier debugging
static int BUCKET_COUNT = 2;
@Rule
public TestName testName = new TestName();
protected HiveConf hiveConf;
protected Driver d;
private TxnStore txnHandler;
protected enum Table {
ACIDTBL("acidTbl"),
ACIDTBLPART("acidTblPart", "p"),
NONACIDORCTBL("nonAcidOrcTbl"),
NONACIDPART("nonAcidPart", "p"),
NONACIDPART2("nonAcidPart2", "p2"),
ACIDNESTEDPART("acidNestedPart", "p,q"),
MMTBL("mmTbl");
private final String name;
private final String partitionColumns;
@Override
public String toString() {
return name;
}
String getPartitionColumns() {
return partitionColumns;
}
Table(String name) {
this(name, null);
}
Table(String name, String partitionColumns) {
this.name = name;
this.partitionColumns = partitionColumns;
}
}
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before
public void setUp() throws Exception {
setUpWithTableProperties("'transactional'='true'");
}
void setUpWithTableProperties(String tableProperties) throws Exception {
hiveConf = new HiveConf(this.getClass());
Path workDir = new Path(System.getProperty("test.tmp.dir",
"target" + File.separator + "test" + File.separator + "tmp"));
hiveConf.set("mapred.local.dir", workDir + File.separator + this.getClass().getSimpleName()
+ File.separator + "mapred" + File.separator + "local");
hiveConf.set("mapred.system.dir", workDir + File.separator + this.getClass().getSimpleName()
+ File.separator + "mapred" + File.separator + "system");
hiveConf.set("mapreduce.jobtracker.staging.root.dir", workDir + File.separator + this.getClass().getSimpleName()
+ File.separator + "mapred" + File.separator + "staging");
hiveConf.set("mapred.temp.dir", workDir + File.separator + this.getClass().getSimpleName()
+ File.separator + "mapred" + File.separator + "temp");
hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
hiveConf
.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true);
HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.MERGE_SPLIT_UPDATE, true);
hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
//TestTxnCommands2WithSplitUpdateAndVectorization has the vectorized version
//of these tests.
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
TxnDbUtil.setConfValues(hiveConf);
TxnDbUtil.prepDb(hiveConf);
txnHandler = TxnUtils.getTxnStore(hiveConf);
File f = new File(TEST_WAREHOUSE_DIR);
if (f.exists()) {
FileUtil.fullyDelete(f);
}
if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) {
throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR);
}
SessionState ss = SessionState.start(hiveConf);
ss.applyAuthorizationPolicy();
d = new Driver(new QueryState.Builder().withHiveConf(hiveConf).nonIsolated().build());
d.setMaxRows(10000);
dropTables();
runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES (" + tableProperties + ")");
runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES (" + tableProperties + ")");
runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
runStatementOnDriver("create table " + Table.NONACIDPART + "(a int, b int) partitioned by (p string) stored as orc TBLPROPERTIES ('transactional'='false')");
runStatementOnDriver("create table " + Table.NONACIDPART2 +
"(a2 int, b2 int) partitioned by (p2 string) stored as orc TBLPROPERTIES ('transactional'='false')");
runStatementOnDriver("create table " + Table.ACIDNESTEDPART +
"(a int, b int) partitioned by (p int, q int) clustered by (a) into " + BUCKET_COUNT +
" buckets stored as orc TBLPROPERTIES (" + tableProperties + ")");
runStatementOnDriver("create table " + Table.MMTBL + "(a int, b int) TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')");
}
protected void dropTables() throws Exception {
for(Table t : Table.values()) {
runStatementOnDriver("drop table if exists " + t);
}
}
@After
public void tearDown() throws Exception {
try {
if (d != null) {
dropTables();
d.close();
d.destroy();
d = null;
}
TxnDbUtil.cleanDb(hiveConf);
} finally {
FileUtils.deleteDirectory(new File(TEST_DATA_DIR));
}
}
@Test
public void testOrcPPD() throws Exception {
testOrcPPD(true);
}
@Test
public void testOrcNoPPD() throws Exception {
testOrcPPD(false);
}
/**
* this is run 2 times: 1 with PPD on, 1 with off
* Also, the queries are such that if we were to push predicate down to an update/delete delta,
* the test would produce wrong results
* @param enablePPD
* @throws Exception
*/
private void testOrcPPD(boolean enablePPD) throws Exception {
boolean originalPpd = hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER);
hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, enablePPD);//enables ORC PPD
//create delta_0001_0001_0000 (should push predicate here)
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(new int[][]{{1, 2}, {3, 4}}));
List<String> explain;
String query = "update " + Table.ACIDTBL + " set b = 5 where a = 3";
if (enablePPD) {
explain = runStatementOnDriver("explain " + query);
/*
here is a portion of the above "explain". The "filterExpr:" in the TableScan is the pushed predicate
w/o PPD, the line is simply not there, otherwise the plan is the same
Map Operator Tree:,
TableScan,
alias: acidtbl,
filterExpr: (a = 3) (type: boolean),
Filter Operator,
predicate: (a = 3) (type: boolean),
Select Operator,
...
*/
assertExplainHasString("filterExpr: (a = 3)", explain, "PPD wasn't pushed");
}
//create delta_0002_0002_0000 (can't push predicate)
runStatementOnDriver(query);
query = "select a,b from " + Table.ACIDTBL + " where b = 4 order by a,b";
if (enablePPD) {
/*at this point we have 2 delta files, 1 for insert 1 for update
* we should push predicate into 1st one but not 2nd. If the following 'select' were to
* push into the 'update' delta, we'd filter out {3,5} before doing merge and thus
* produce {3,4} as the value for 2nd row. The right result is 0-rows.*/
explain = runStatementOnDriver("explain " + query);
assertExplainHasString("filterExpr: (b = 4)", explain, "PPD wasn't pushed");
}
List<String> rs0 = runStatementOnDriver(query);
Assert.assertEquals("Read failed", 0, rs0.size());
runStatementOnDriver("alter table " + Table.ACIDTBL + " compact 'MAJOR'");
runWorker(hiveConf);
//now we have base_0001 file
int[][] tableData2 = {{1, 7}, {5, 6}, {7, 8}, {9, 10}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2));
//now we have delta_0003_0003_0000 with inserts only (ok to push predicate)
if (enablePPD) {
explain = runStatementOnDriver("explain delete from " + Table.ACIDTBL + " where a=7 and b=8");
assertExplainHasString("filterExpr: ((a = 7) and (b = 8))", explain, "PPD wasn't pushed");
}
runStatementOnDriver("delete from " + Table.ACIDTBL + " where a=7 and b=8");
//now we have delta_0004_0004_0000 with delete events
/*(can't push predicate to 'delete' delta)
* if we were to push to 'delete' delta, we'd filter out all rows since the 'row' is always NULL for
* delete events and we'd produce data as if the delete never happened*/
query = "select a,b from " + Table.ACIDTBL + " where a > 1 order by a,b";
if(enablePPD) {
explain = runStatementOnDriver("explain " + query);
assertExplainHasString("filterExpr: (a > 1)", explain, "PPD wasn't pushed");
}
List<String> rs1 = runStatementOnDriver(query);
int [][] resultData = new int[][] {{3, 5}, {5, 6}, {9, 10}};
Assert.assertEquals("Update failed", stringifyValues(resultData), rs1);
hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, originalPpd);
}
static void assertExplainHasString(String string, List<String> queryPlan, String errMsg) {
for(String line : queryPlan) {
if(line != null && line.contains(string)) {
return;
}
}
Assert.assertFalse(errMsg, true);
}
@Test
public void testAlterTable() throws Exception {
int[][] tableData = {{1,2}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
runWorker(hiveConf);
int[][] tableData2 = {{5,6}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2));
List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " where b > 0 order by a,b");
runStatementOnDriver("alter table " + Table.ACIDTBL + " add columns(c int)");
int[][] moreTableData = {{7,8,9}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b,c) " + makeValuesClause(moreTableData));
List<String> rs0 = runStatementOnDriver("select a,b,c from " + Table.ACIDTBL + " where a > 0 order by a,b,c");
}
// @Ignore("not needed but useful for testing")
@Test
public void testNonAcidInsert() throws Exception {
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(2,3)");
List<String> rs1 = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
}
@Test
public void testOriginalFileReaderWhenNonAcidConvertedToAcid() throws Exception {
// 1. Insert five rows to Non-ACID table.
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2),(3,4),(5,6),(7,8),(9,10)");
// 2. Convert NONACIDORCTBL to ACID table. //todo: remove trans_prop after HIVE-17089
runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b = b*2 where b in (4,10)");
runStatementOnDriver("delete from " + Table.NONACIDORCTBL + " where a = 7");
List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL + " order by a,b");
int[][] resultData = new int[][] {{1,2}, {3,8}, {5,6}, {9,20}};
Assert.assertEquals(stringifyValues(resultData), rs);
// 3. Perform a major compaction.
runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
runWorker(hiveConf);
ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local"));
// 3. Perform a delete.
runStatementOnDriver("delete from " + Table.NONACIDORCTBL + " where a = 1");
rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL + " order by a,b");
resultData = new int[][] {{3,8}, {5,6}, {9,20}};
Assert.assertEquals(stringifyValues(resultData), rs);
}
/**
* see HIVE-16177
* See also {@link TestTxnCommands#testNonAcidToAcidConversion01()}
* {@link TestTxnNoBuckets#testCTAS()}
*/
@Test
public void testNonAcidToAcidConversion02() throws Exception {
//create 2 rows in a file 000001_0 (and an empty 000000_0)
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2),(1,3)");
//create 2 rows in a file 000000_0_copy1 and 2 rows in a file 000001_0_copy1
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,12),(0,13),(1,4),(1,5)");
//create 1 row in a file 000001_0_copy2 (and empty 000000_0_copy2?)
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,6)");
//convert the table to Acid //todo: remove trans_prop after HIVE-17089
runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
List<String> rs1 = runStatementOnDriver("describe "+ Table.NONACIDORCTBL);
//create a some of delta directories
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,15),(1,16)");
runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b = 120 where a = 0 and b = 12");
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,17)");
runStatementOnDriver("delete from " + Table.NONACIDORCTBL + " where a = 1 and b = 3");
List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by a,b");
LOG.warn("before compact");
for(String s : rs) {
LOG.warn(s);
}
Assert.assertEquals(536870912, BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(0)));
Assert.assertEquals(536936448, BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(1)));
/*
* All ROW__IDs are unique on read after conversion to acid
* ROW__IDs are exactly the same before and after compaction
* Also check the file name (only) after compaction for completeness
* Note: order of rows in a file ends up being the reverse of order in values clause (why?!)
*/
String[][] expected = {
{"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t0\t13", "bucket_00001"},
{"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t0\t15", "bucket_00001"},
{"{\"writeid\":10000003,\"bucketid\":536936448,\"rowid\":0}\t0\t17", "bucket_00001"},
{"{\"writeid\":10000002,\"bucketid\":536936448,\"rowid\":0}\t0\t120", "bucket_00001"},
{"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t2", "bucket_00001"},
{"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":5}\t1\t4", "bucket_00001"},
{"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":4}\t1\t5", "bucket_00001"},
{"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":6}\t1\t6", "bucket_00001"},
{"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":1}\t1\t16", "bucket_00001"}
};
Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size());
for(int i = 0; i < expected.length; i++) {
Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i) + "; expected " + expected[i][0],
rs.get(i).startsWith(expected[i][0]));
}
//run Compaction
runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'");
TestTxnCommands2.runWorker(hiveConf);
//TestTxnCommands2.runCleaner(hiveConf);
rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by a,b");
LOG.warn("after compact");
for(String s : rs) {
LOG.warn(s);
}
Assert.assertEquals("Unexpected row count after compaction", expected.length, rs.size());
for(int i = 0; i < expected.length; i++) {
Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
Assert.assertTrue("Actual line(bucket) " + i + " ac: " + rs.get(i), rs.get(i).endsWith(expected[i][1]));
}
//make sure they are the same before and after compaction
}
/**
* In current implementation of ACID, altering the value of transactional_properties or trying to
* set a value for previously unset value for an acid table will throw an exception.
* @throws Exception
*/
@Test
public void testFailureOnAlteringTransactionalProperties() throws Exception {
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("TBLPROPERTIES with 'transactional_properties' cannot be altered after the table is created");
runStatementOnDriver("create table acidTblLegacy (a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'insert_only')");
}
/**
* Test the query correctness and directory layout for ACID table conversion
* 1. Insert a row to Non-ACID table
* 2. Convert Non-ACID to ACID table
* 3. Insert a row to ACID table
* 4. Perform Major compaction
* 5. Clean
* @throws Exception
*/
@Test
public void testNonAcidToAcidConversion1() throws Exception {
FileSystem fs = FileSystem.get(hiveConf);
FileStatus[] status;
// 1. Insert a row to Non-ACID table
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
// There should be 2 original bucket files in the location (000000_0 and 000001_0)
Assert.assertEquals(BUCKET_COUNT, status.length);
for (int i = 0; i < status.length; i++) {
Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
}
List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
int [][] resultData = new int[][] {{1, 2}};
Assert.assertEquals(stringifyValues(resultData), rs);
rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
int resultCount = 1;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
// 2. Convert NONACIDORCTBL to ACID table
runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')");
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
// Everything should be same as before
Assert.assertEquals(BUCKET_COUNT, status.length);
for (int i = 0; i < status.length; i++) {
Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
}
rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
resultData = new int[][] {{1, 2}};
Assert.assertEquals(stringifyValues(resultData), rs);
rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
resultCount = 1;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
// 3. Insert another row to newly-converted ACID table
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)");
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
// There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory.
// The delta directory should also have only 1 bucket file (bucket_00001)
Assert.assertEquals(3, status.length);
boolean sawNewDelta = false;
for (int i = 0; i < status.length; i++) {
if (status[i].getPath().getName().matches("delta_.*")) {
sawNewDelta = true;
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(1, buckets.length); // only one bucket file
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00000_0"));
} else {
Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
}
}
Assert.assertTrue(sawNewDelta);
rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL + " order by a,b");
resultData = new int[][] {{1, 2}, {3, 4}};
Assert.assertEquals(stringifyValues(resultData), rs);
rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
resultCount = 2;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
// 4. Perform a major compaction
runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
runWorker(hiveConf);
// There should be 1 new directory: base_xxxxxxx.
// Original bucket files and delta directory should stay until Cleaner kicks in.
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(4, status.length);
boolean sawNewBase = false;
for (int i = 0; i < status.length; i++) {
if (status[i].getPath().getName().matches("base_.*")) {
sawNewBase = true;
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(buckets);
Assert.assertEquals(2, buckets.length);
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
}
}
Assert.assertTrue(sawNewBase);
rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
resultData = new int[][] {{3, 4}, {1, 2}};
Assert.assertEquals(stringifyValuesNoSort(resultData), rs);
rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
resultCount = 2;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
// 5. Let Cleaner delete obsolete files/dirs
// Note, here we create a fake directory along with fake files as original directories/files
String fakeFile0 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() +
"/subdir/000000_0";
String fakeFile1 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() +
"/subdir/000000_1";
fs.create(new Path(fakeFile0));
fs.create(new Path(fakeFile1));
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
// Before Cleaner, there should be 5 items:
// 2 original files, 1 original directory, 1 base directory and 1 delta directory
Assert.assertEquals(5, status.length);
runCleaner(hiveConf);
// There should be only 1 directory left: base_xxxxxxx.
// Original bucket files and delta directory should have been cleaned up.
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(1, status.length);
Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(buckets);
Assert.assertEquals(2, buckets.length);
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
resultData = new int[][] {{3, 4}, {1, 2}};
Assert.assertEquals(stringifyValuesNoSort(resultData), rs);
rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
resultCount = 2;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
}
/**
* Test the query correctness and directory layout for ACID table conversion
* 1. Insert a row to Non-ACID table
* 2. Convert Non-ACID to ACID table
* 3. Update the existing row in ACID table
* 4. Perform Major compaction
* 5. Clean
* @throws Exception
*/
@Test
public void testNonAcidToAcidConversion2() throws Exception {
FileSystem fs = FileSystem.get(hiveConf);
FileStatus[] status;
// 1. Insert a row to Non-ACID table
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
// There should be 2 original bucket files in the location (000000_0 and 000001_0)
Assert.assertEquals(BUCKET_COUNT, status.length);
for (int i = 0; i < status.length; i++) {
Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
}
List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
int [][] resultData = new int[][] {{1, 2}};
Assert.assertEquals(stringifyValues(resultData), rs);
rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
int resultCount = 1;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
// 2. Convert NONACIDORCTBL to ACID table
runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')");
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
// Everything should be same as before
Assert.assertEquals(BUCKET_COUNT, status.length);
for (int i = 0; i < status.length; i++) {
Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
}
rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
resultData = new int[][] {{1, 2}};
Assert.assertEquals(stringifyValues(resultData), rs);
rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
resultCount = 1;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
// 3. Update the existing row in newly-converted ACID table
runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1");
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
// There should be 2 original bucket files (000000_0 and 000001_0), plus one delta directory
// and one delete_delta directory. When split-update is enabled, an update event is split into
// a combination of delete and insert, that generates the delete_delta directory.
// The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001)
// and so should the delete_delta directory.
Assert.assertEquals(4, status.length);
boolean sawNewDelta = false;
boolean sawNewDeleteDelta = false;
for (int i = 0; i < status.length; i++) {
if (status[i].getPath().getName().matches("delta_.*")) {
sawNewDelta = true;
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
} else if (status[i].getPath().getName().matches("delete_delta_.*")) {
sawNewDeleteDelta = true;
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
} else {
Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
}
}
Assert.assertTrue(sawNewDelta);
Assert.assertTrue(sawNewDeleteDelta);
rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
resultData = new int[][] {{1, 3}};
Assert.assertEquals(stringifyValues(resultData), rs);
rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
resultCount = 1;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
// 4. Perform a major compaction
runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
runWorker(hiveConf);
// There should be 1 new directory: base_0000001.
// Original bucket files and delta directory should stay until Cleaner kicks in.
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(5, status.length);
boolean sawNewBase = false;
for (int i = 0; i < status.length; i++) {
if (status[i].getPath().getName().matches("base_.*")) {
sawNewBase = true;
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
}
}
Assert.assertTrue(sawNewBase);
rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
resultData = new int[][] {{1, 3}};
Assert.assertEquals(stringifyValues(resultData), rs);
rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
resultCount = 1;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
// 5. Let Cleaner delete obsolete files/dirs
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
// Before Cleaner, there should be 5 items:
// 2 original files, 1 delta directory, 1 delete_delta directory and 1 base directory
Assert.assertEquals(5, status.length);
runCleaner(hiveConf);
// There should be only 1 directory left: base_0000001.
// Original bucket files, delta directory and delete_delta should have been cleaned up.
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(1, status.length);
Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
resultData = new int[][] {{1, 3}};
Assert.assertEquals(stringifyValues(resultData), rs);
rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
resultCount = 1;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
}
/**
* Test the query correctness and directory layout for ACID table conversion
* 1. Insert a row to Non-ACID table
* 2. Convert Non-ACID to ACID table
* 3. Perform Major compaction
* 4. Insert a new row to ACID table
* 5. Perform another Major compaction
* 6. Clean
* @throws Exception
*/
@Test
public void testNonAcidToAcidConversion3() throws Exception {
FileSystem fs = FileSystem.get(hiveConf);
FileStatus[] status;
// 1. Insert a row to Non-ACID table
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
// There should be 2 original bucket files in the location (000000_0 and 000001_0)
Assert.assertEquals(BUCKET_COUNT, status.length);
for (int i = 0; i < status.length; i++) {
Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
}
List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
int [][] resultData = new int[][] {{1, 2}};
Assert.assertEquals(stringifyValues(resultData), rs);
rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
int resultCount = 1;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
// 2. Convert NONACIDORCTBL to ACID table
runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')");
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
// Everything should be same as before
Assert.assertEquals(BUCKET_COUNT, status.length);
for (int i = 0; i < status.length; i++) {
Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
}
rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
resultData = new int[][] {{1, 2}};
Assert.assertEquals(stringifyValues(resultData), rs);
rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
resultCount = 1;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
// 3. Perform a major compaction
runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
runWorker(hiveConf);
// There should be 1 new directory: base_-9223372036854775808
// Original bucket files should stay until Cleaner kicks in.
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(3, status.length);
boolean sawNewBase = false;
for (int i = 0; i < status.length; i++) {
if (status[i].getPath().getName().matches("base_.*")) {
//should be base_-9223372036854775808_v0000021 but 21 is a txn id not write id so it makes
//the tests fragile
Assert.assertTrue(status[i].getPath().getName().startsWith("base_-9223372036854775808_v0000021"));
sawNewBase = true;
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
}
}
Assert.assertTrue(sawNewBase);
rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
resultData = new int[][] {{1, 2}};
Assert.assertEquals(stringifyValues(resultData), rs);
rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
resultCount = 1;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
// 4. Update the existing row, and insert another row to newly-converted ACID table
runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1");
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)");
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(status); // make sure delta_0000001_0000001_0000 appears before delta_0000002_0000002_0000
// There should be 2 original bucket files (000000_0 and 000001_0), a base directory,
// plus two new delta directories and one delete_delta directory that would be created due to
// the update statement (remember split-update U=D+I)!
Assert.assertEquals(6, status.length);
int numDelta = 0;
int numDeleteDelta = 0;
sawNewBase = false;
for (int i = 0; i < status.length; i++) {
if (status[i].getPath().getName().matches("delta_.*")) {
numDelta++;
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(buckets);
if (numDelta == 1) {
Assert.assertEquals("delta_10000001_10000001_0000", status[i].getPath().getName());
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
} else if (numDelta == 2) {
Assert.assertEquals("delta_10000002_10000002_0000", status[i].getPath().getName());
Assert.assertEquals(1, buckets.length);
Assert.assertEquals("bucket_00000_0", buckets[0].getPath().getName());
}
} else if (status[i].getPath().getName().matches("delete_delta_.*")) {
numDeleteDelta++;
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(buckets);
if (numDeleteDelta == 1) {
Assert.assertEquals("delete_delta_10000001_10000001_0000", status[i].getPath().getName());
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
}
} else if (status[i].getPath().getName().matches("base_.*")) {
Assert.assertTrue("base_-9223372036854775808", status[i].getPath().getName().startsWith("base_-9223372036854775808_v0000021"));//_v0000021
sawNewBase = true;
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
} else {
Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
}
}
Assert.assertEquals(2, numDelta);
Assert.assertEquals(1, numDeleteDelta);
Assert.assertTrue(sawNewBase);
rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
resultData = new int[][] {{1, 3}, {3, 4}};
Assert.assertEquals(stringifyValues(resultData), rs);
rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
resultCount = 2;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
// 5. Perform another major compaction
runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
runWorker(hiveConf);
// There should be 1 new base directory: base_0000001
// Original bucket files, delta directories, delete_delta directories and the
// previous base directory should stay until Cleaner kicks in.
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(status);
Assert.assertEquals(7, status.length);
int numBase = 0;
for (int i = 0; i < status.length; i++) {
if (status[i].getPath().getName().matches("base_.*")) {
numBase++;
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(buckets);
if (numBase == 1) {
Assert.assertEquals("base_-9223372036854775808_v0000021", status[i].getPath().getName());
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
} else if (numBase == 2) {
// The new base dir now has two bucket files, since the delta dir has two bucket files
Assert.assertEquals("base_10000002_v0000028", status[i].getPath().getName());
Assert.assertEquals(2, buckets.length);
Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
}
}
}
Assert.assertEquals(2, numBase);
rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
resultData = new int[][] {{3, 4}, {1, 3}};
Assert.assertEquals(stringifyValuesNoSort(resultData), rs);
rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
resultCount = 2;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
// 6. Let Cleaner delete obsolete files/dirs
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
// Before Cleaner, there should be 6 items:
// 2 original files, 2 delta directories, 1 delete_delta directory and 2 base directories
Assert.assertEquals(7, status.length);
runCleaner(hiveConf);
// There should be only 1 directory left: base_0000001.
// Original bucket files, delta directories and previous base directory should have been cleaned up.
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(1, status.length);
Assert.assertEquals("base_10000002_v0000028", status[0].getPath().getName());
FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(buckets);
Assert.assertEquals(2, buckets.length);
Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
resultData = new int[][] {{3, 4}, {1, 3}};
Assert.assertEquals(stringifyValuesNoSort(resultData), rs);
rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
resultCount = 2;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
}
@Test
public void testValidTxnsBookkeeping() throws Exception {
// 1. Run a query against a non-ACID table, and we shouldn't have txn logged in conf
runStatementOnDriver("select * from " + Table.NONACIDORCTBL);
String value = hiveConf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY);
Assert.assertNull("The entry should be null for query that doesn't involve ACID tables", value);
}
@Test
public void testSimpleRead() throws Exception {
hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "more");
int[][] tableData = {{1,2},{3,3}};
runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(tableData));
int[][] tableData2 = {{5,3}};
//this will cause next txn to be marked aborted but the data is still written to disk
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(tableData2));
assert hiveConf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY) == null : "previous txn should've cleaned it";
//so now if HIVEFETCHTASKCONVERSION were to use a stale value, it would use a
//ValidWriteIdList with HWM=MAX_LONG, i.e. include the data for aborted txn
List<String> rs = runStatementOnDriver("select * from " + Table.ACIDTBL);
Assert.assertEquals("Extra data", 2, rs.size());
}
@Test
public void testUpdateMixedCase() throws Exception {
int[][] tableData = {{1,2},{3,3},{5,3}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
runStatementOnDriver("update " + Table.ACIDTBL + " set B = 7 where A=1");
List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
int[][] updatedData = {{1,7},{3,3},{5,3}};
Assert.assertEquals("Update failed", stringifyValues(updatedData), rs);
runStatementOnDriver("update " + Table.ACIDTBL + " set B = B + 1 where A=1");
List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
int[][] updatedData2 = {{1,8},{3,3},{5,3}};
Assert.assertEquals("Update failed", stringifyValues(updatedData2), rs2);
}
@Test
public void testDeleteIn() throws Exception {
int[][] tableData = {{1,2},{3,2},{5,2},{1,3},{3,3},{5,3}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,7),(3,7)");
//todo: once multistatement txns are supported, add a test to run next 2 statements in a single txn
runStatementOnDriver("delete from " + Table.ACIDTBL + " where a in(select a from " + Table.NONACIDORCTBL + ")");
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) select a,b from " + Table.NONACIDORCTBL);
List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
int[][] updatedData = {{1,7},{3,7},{5,2},{5,3}};
Assert.assertEquals("Bulk update failed", stringifyValues(updatedData), rs);
runStatementOnDriver("update " + Table.ACIDTBL + " set b=19 where b in(select b from " + Table.NONACIDORCTBL + " where a = 3)");
List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
int[][] updatedData2 = {{1,19},{3,19},{5,2},{5,3}};
Assert.assertEquals("Bulk update2 failed", stringifyValues(updatedData2), rs2);
}
/**
* Test update that hits multiple partitions (i.e. requries dynamic partition insert to process)
* @throws Exception
*/
@Test
public void updateDeletePartitioned() throws Exception {
int[][] tableData = {{1,2},{3,4},{5,6}};
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=1) (a,b) " + makeValuesClause(tableData));
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=2) (a,b) " + makeValuesClause(tableData));
CompactionRequest request = new CompactionRequest("default", Table.ACIDTBLPART.name(), CompactionType.MAJOR);
request.setPartitionname("p=1");
txnHandler.compact(request);
runWorker(hiveConf);
runCleaner(hiveConf);
runStatementOnDriver("update " + Table.ACIDTBLPART + " set b = b + 1 where a = 3");
txnHandler.compact(request);
runWorker(hiveConf);
runCleaner(hiveConf);
List<String> rs = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b");
int[][] expectedData = {{1,1,2},{1,3,5},{1,5,6},{2,1,2},{2,3,5},{2,5,6}};
Assert.assertEquals("Update " + Table.ACIDTBLPART + " didn't match:", stringifyValues(expectedData), rs);
}
/**
* https://issues.apache.org/jira/browse/HIVE-17391
*/
@Test
public void testEmptyInTblproperties() throws Exception {
runStatementOnDriver("create table t1 " + "(a int, b int) stored as orc TBLPROPERTIES ('serialization.null.format'='', 'transactional'='true')");
runStatementOnDriver("insert into t1 " + "(a,b) values(1,7),(3,7)");
runStatementOnDriver("update t1" + " set b = -2 where a = 1");
runStatementOnDriver("alter table t1 " + " compact 'MAJOR'");
runWorker(hiveConf);
ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local"));
}
/**
* https://issues.apache.org/jira/browse/HIVE-10151
*/
@Test
public void testBucketizedInputFormat() throws Exception {
int[][] tableData = {{1,2}};
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=1) (a,b) " + makeValuesClause(tableData));
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) select a,b from " + Table.ACIDTBLPART + " where p = 1");
List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL);//no order by as it's just 1 row
Assert.assertEquals("Insert into " + Table.ACIDTBL + " didn't match:", stringifyValues(tableData), rs);
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) select a,b from " + Table.ACIDTBLPART + " where p = 1");
List<String> rs2 = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);//no order by as it's just 1 row
Assert.assertEquals("Insert into " + Table.NONACIDORCTBL + " didn't match:", stringifyValues(tableData), rs2);
}
@Test
public void testInsertOverwriteWithSelfJoin() throws Exception {
int[][] part1Data = {{1,7}};
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) " + makeValuesClause(part1Data));
//this works because logically we need S lock on NONACIDORCTBL to read and X lock to write, but
//LockRequestBuilder dedups locks on the same entity to only keep the highest level lock requested
runStatementOnDriver("insert overwrite table " + Table.NONACIDORCTBL + " select 2, 9 from " + Table.NONACIDORCTBL + " T inner join " + Table.NONACIDORCTBL + " S on T.a=S.a");
List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL + " order by a,b");
int[][] joinData = {{2,9}};
Assert.assertEquals("Self join non-part insert overwrite failed", stringifyValues(joinData), rs);
int[][] part2Data = {{1,8}};
runStatementOnDriver("insert into " + Table.NONACIDPART + " partition(p=1) (a,b) " + makeValuesClause(part1Data));
runStatementOnDriver("insert into " + Table.NONACIDPART + " partition(p=2) (a,b) " + makeValuesClause(part2Data));
//here we need X lock on p=1 partition to write and S lock on 'table' to read which should
//not block each other since they are part of the same txn
runStatementOnDriver("insert overwrite table " + Table.NONACIDPART + " partition(p=1) select a,b from " + Table.NONACIDPART);
List<String> rs2 = runStatementOnDriver("select a,b from " + Table.NONACIDPART + " order by a,b");
int[][] updatedData = {{1,7},{1,8},{1,8}};
Assert.assertEquals("Insert overwrite partition failed", stringifyValues(updatedData), rs2);
//insert overwrite not supported for ACID tables
}
private static void checkCompactionState(CompactionsByState expected, CompactionsByState actual) {
Assert.assertEquals(TxnStore.ATTEMPTED_RESPONSE, expected.attempted, actual.attempted);
Assert.assertEquals(TxnStore.FAILED_RESPONSE, expected.failed, actual.failed);
Assert.assertEquals(TxnStore.INITIATED_RESPONSE, expected.initiated, actual.initiated);
Assert.assertEquals(TxnStore.CLEANING_RESPONSE, expected.readyToClean, actual.readyToClean);
Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, expected.succeeded, actual.succeeded);
Assert.assertEquals(TxnStore.WORKING_RESPONSE, expected.working, actual.working);
Assert.assertEquals("total", expected.total, actual.total);
}
/**
* HIVE-12353
* @throws Exception
*/
@Test
public void testInitiatorWithMultipleFailedCompactions() throws Exception {
testInitiatorWithMultipleFailedCompactionsForVariousTblProperties("'transactional'='true'");
}
void testInitiatorWithMultipleFailedCompactionsForVariousTblProperties(String tblProperties) throws Exception {
String tblName = "hive12353";
runStatementOnDriver("drop table if exists " + tblName);
runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
" STORED AS ORC TBLPROPERTIES ( " + tblProperties + " )");
hiveConf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 4);
for(int i = 0; i < 5; i++) {
//generate enough delta files so that Initiator can trigger auto compaction
runStatementOnDriver("insert into " + tblName + " values(" + (i + 1) + ", 'foo'),(" + (i + 2) + ", 'bar'),(" + (i + 3) + ", 'baz')");
}
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_ON, true);
int numFailedCompactions = hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
AtomicBoolean stop = new AtomicBoolean(true);
//create failed compactions
for(int i = 0; i < numFailedCompactions; i++) {
//each of these should fail
txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
runWorker(hiveConf);
}
//this should not schedule a new compaction due to prior failures, but will create Attempted entry
Initiator init = new Initiator();
init.setThreadId((int)init.getId());
init.setConf(hiveConf);
init.init(stop);
init.run();
int numAttemptedCompactions = 1;
checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions,0,0,0,0,numFailedCompactions + numAttemptedCompactions), countCompacts(txnHandler));
hiveConf.setTimeVar(HiveConf.ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL, 10, TimeUnit.MILLISECONDS);
MetastoreTaskThread houseKeeper = new AcidHouseKeeperService();
houseKeeper.setConf(hiveConf);
houseKeeper.run();
checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions,0,0,0,0,numFailedCompactions + numAttemptedCompactions), countCompacts(txnHandler));
txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MAJOR));
runWorker(hiveConf);//will fail
txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
runWorker(hiveConf);//will fail
init.run();
numAttemptedCompactions++;
init.run();
numAttemptedCompactions++;
checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions + 2,0,0,0,0,numFailedCompactions + 2 + numAttemptedCompactions), countCompacts(txnHandler));
houseKeeper.run();
//COMPACTOR_HISTORY_RETENTION_FAILED failed compacts left (and no other since we only have failed ones here)
checkCompactionState(new CompactionsByState(
hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),0,0,0,0,
hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)), countCompacts(txnHandler));
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, false);
txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
//at this point "show compactions" should have (COMPACTOR_HISTORY_RETENTION_FAILED) failed + 1 initiated (explicitly by user)
checkCompactionState(new CompactionsByState(
hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),1,0,0,0,
hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) +
hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)+ 1), countCompacts(txnHandler));
runWorker(hiveConf);//will succeed and transition to Initiated->Working->Ready for Cleaning
checkCompactionState(new CompactionsByState(
hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),0,1,0,0,
hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) +
hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)+ 1), countCompacts(txnHandler));
runCleaner(hiveConf); // transition to Success state
houseKeeper.run();
checkCompactionState(new CompactionsByState(
hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),0,0,1,0,
hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) +
hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)+ 1), countCompacts(txnHandler));
}
/**
* Make sure there's no FileSystem$Cache$Key leak due to UGI use
* @throws Exception
*/
@Test
public void testFileSystemUnCaching() throws Exception {
int cacheSizeBefore;
int cacheSizeAfter;
// get the size of cache BEFORE
cacheSizeBefore = getFileSystemCacheSize();
// Insert a row to ACID table
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
// Perform a major compaction
runStatementOnDriver("alter table " + Table.ACIDTBL + " compact 'major'");
runWorker(hiveConf);
runCleaner(hiveConf);
// get the size of cache AFTER
cacheSizeAfter = getFileSystemCacheSize();
Assert.assertEquals(cacheSizeBefore, cacheSizeAfter);
}
private int getFileSystemCacheSize() throws Exception {
try {
Field cache = FileSystem.class.getDeclaredField("CACHE");
cache.setAccessible(true);
Object o = cache.get(null); // FileSystem.CACHE
Field mapField = o.getClass().getDeclaredField("map");
mapField.setAccessible(true);
Map map = (HashMap)mapField.get(o); // FileSystem.CACHE.map
return map.size();
} catch (NoSuchFieldException e) {
System.out.println(e);
}
return 0;
}
private static class CompactionsByState {
private int attempted;
private int failed;
private int initiated;
private int readyToClean;
private int succeeded;
private int working;
private int total;
CompactionsByState() {
this(0,0,0,0,0,0,0);
}
CompactionsByState(int attempted, int failed, int initiated, int readyToClean, int succeeded, int working, int total) {
this.attempted = attempted;
this.failed = failed;
this.initiated = initiated;
this.readyToClean = readyToClean;
this.succeeded = succeeded;
this.working = working;
this.total = total;
}
}
private static CompactionsByState countCompacts(TxnStore txnHandler) throws MetaException {
ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
CompactionsByState compactionsByState = new CompactionsByState();
compactionsByState.total = resp.getCompactsSize();
for(ShowCompactResponseElement compact : resp.getCompacts()) {
if(TxnStore.FAILED_RESPONSE.equals(compact.getState())) {
compactionsByState.failed++;
}
else if(TxnStore.CLEANING_RESPONSE.equals(compact.getState())) {
compactionsByState.readyToClean++;
}
else if(TxnStore.INITIATED_RESPONSE.equals(compact.getState())) {
compactionsByState.initiated++;
}
else if(TxnStore.SUCCEEDED_RESPONSE.equals(compact.getState())) {
compactionsByState.succeeded++;
}
else if(TxnStore.WORKING_RESPONSE.equals(compact.getState())) {
compactionsByState.working++;
}
else if(TxnStore.ATTEMPTED_RESPONSE.equals(compact.getState())) {
compactionsByState.attempted++;
}
else {
throw new IllegalStateException("Unexpected state: " + compact.getState());
}
}
return compactionsByState;
}
public static void runWorker(HiveConf hiveConf) throws Exception {
TxnCommandsBaseForTests.runWorker(hiveConf);
}
public static void runCleaner(HiveConf hiveConf) throws Exception {
TxnCommandsBaseForTests.runCleaner(hiveConf);
}
public static void runInitiator(HiveConf hiveConf) throws Exception {
TxnCommandsBaseForTests.runInitiator(hiveConf);
}
/**
* HIVE-12352 has details
* @throws Exception
*/
@Test
public void writeBetweenWorkerAndCleaner() throws Exception {
writeBetweenWorkerAndCleanerForVariousTblProperties("'transactional'='true'");
}
private void writeBetweenWorkerAndCleanerForVariousTblProperties(String tblProperties) throws Exception {
String tblName = "hive12352";
runStatementOnDriver("drop table if exists " + tblName);
runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
" STORED AS ORC TBLPROPERTIES ( " + tblProperties + " )");
//create some data
runStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 'bar'),(3, 'baz')");
runStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3");
//run Worker to execute compaction
txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
runWorker(hiveConf);
//delete something, but make sure txn is rolled back
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
runStatementOnDriver("delete from " + tblName + " where a = 1");
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
List<String> expected = new ArrayList<>();
expected.add("1\tfoo");
expected.add("2\tbar");
expected.add("3\tblah");
Assert.assertEquals("", expected,
runStatementOnDriver("select a,b from " + tblName + " order by a"));
runCleaner(hiveConf);
FileSystem fs = FileSystem.get(hiveConf);
FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + tblName.toLowerCase()),
FileUtils.HIDDEN_FILES_PATH_FILTER);
Set<String> expectedDeltas = new HashSet<>();
expectedDeltas.add("delete_delta_0000001_0000002_v0000019");
expectedDeltas.add("delta_0000001_0000002_v0000019");
Set<String> actualDeltas = new HashSet<>();
for(FileStatus file : status) {
actualDeltas.add(file.getPath().getName());
}
Assert.assertEquals(expectedDeltas, actualDeltas);
//this seems odd, but we want to make sure that run CompactionTxnHandler.cleanEmptyAbortedTxns()
runInitiator(hiveConf);
//and check that aborted delete operation didn't become committed
Assert.assertEquals("", expected,
runStatementOnDriver("select a,b from " + tblName + " order by a"));
}
/**
* Simulate the scenario when a heartbeat failed due to client errors such as no locks or no txns being found.
* When a heartbeat fails, the query should be failed too.
* @throws Exception
*/
@Test
public void testFailHeartbeater() throws Exception {
// Fail heartbeater, so that we can get a RuntimeException from the query.
// More specifically, it's the original IOException thrown by either MR's or Tez's progress monitoring loop.
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER, true);
Exception exception = null;
try {
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(new int[][]{{1, 2}, {3, 4}}));
} catch (RuntimeException e) {
exception = e;
}
Assert.assertNotNull(exception);
Assert.assertTrue(exception.getMessage().contains("HIVETESTMODEFAILHEARTBEATER=true"));
}
@Test
public void testOpenTxnsCounter() throws Exception {
hiveConf.setIntVar(HiveConf.ConfVars.HIVE_MAX_OPEN_TXNS, 3);
hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_COUNT_OPEN_TXNS_INTERVAL, 10, TimeUnit.MILLISECONDS);
OpenTxnsResponse openTxnsResponse = txnHandler.openTxns(new OpenTxnRequest(3, "me", "localhost"));
AcidOpenTxnsCounterService openTxnsCounterService = new AcidOpenTxnsCounterService();
openTxnsCounterService.setConf(hiveConf);
openTxnsCounterService.run(); // will update current number of open txns to 3
MetaException exception = null;
// This should fail once it finds out the threshold has been reached
try {
txnHandler.openTxns(new OpenTxnRequest(1, "you", "localhost"));
} catch (MetaException e) {
exception = e;
}
Assert.assertNotNull("Opening new transaction shouldn't be allowed", exception);
Assert.assertTrue(exception.getMessage().equals("Maximum allowed number of open transactions has been reached. See hive.max.open.txns."));
// After committing the initial txns, and updating current number of open txns back to 0,
// new transactions should be allowed to open
for (long txnid : openTxnsResponse.getTxn_ids()) {
txnHandler.commitTxn(new CommitTxnRequest(txnid));
}
openTxnsCounterService.run(); // will update current number of open txns back to 0
exception = null;
try {
txnHandler.openTxns(new OpenTxnRequest(1, "him", "localhost"));
} catch (MetaException e) {
exception = e;
}
Assert.assertNull(exception);
}
@Test
public void testCompactWithDelete() throws Exception {
int[][] tableData = {{1,2},{3,4}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
runWorker(hiveConf);
runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 4");
runStatementOnDriver("update " + Table.ACIDTBL + " set b = -2 where b = 2");
runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MINOR'");
runWorker(hiveConf);
ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals("Unexpected number of compactions in history", 2, resp.getCompactsSize());
Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
Assert.assertEquals("Unexpected 1 compaction state", TxnStore.SUCCEEDED_RESPONSE,
resp.getCompacts().get(1).getState());
}
/**
* make sure Aborted txns don't red-flag a base_xxxx (HIVE-14350)
*/
@Test
public void testNoHistory() throws Exception {
int[][] tableData = {{1,2},{3,4}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
runWorker(hiveConf);
runCleaner(hiveConf);
runStatementOnDriver("select count(*) from " + Table.ACIDTBL);
}
@Test
public void testACIDwithSchemaEvolutionAndCompaction() throws Exception {
testACIDwithSchemaEvolutionForVariousTblProperties("'transactional'='true'");
}
protected void testACIDwithSchemaEvolutionForVariousTblProperties(String tblProperties) throws Exception {
String tblName = "acidWithSchemaEvol";
int numBuckets = 1;
runStatementOnDriver("drop table if exists " + tblName);
runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" CLUSTERED BY(a) INTO " + numBuckets +" BUCKETS" + //currently ACID requires table to be bucketed
" STORED AS ORC TBLPROPERTIES ( " + tblProperties + " )");
// create some data
runStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 'bar'),(3, 'baz')");
runStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3");
// apply schema evolution by adding some columns
runStatementOnDriver("alter table " + tblName + " add columns(c int, d string)");
// insert some data in new schema
runStatementOnDriver("insert into " + tblName + " values(4, 'acid', 100, 'orc'),"
+ "(5, 'llap', 200, 'tez')");
// update old data with values for the new schema columns
runStatementOnDriver("update " + tblName + " set d = 'hive' where a <= 3");
runStatementOnDriver("update " + tblName + " set c = 999 where a <= 3");
// read the entire data back and see if did everything right
List<String> rs = runStatementOnDriver("select * from " + tblName + " order by a");
String[] expectedResult = { "1\tfoo\t999\thive", "2\tbar\t999\thive", "3\tblah\t999\thive", "4\tacid\t100\torc", "5\tllap\t200\ttez" };
Assert.assertEquals(Arrays.asList(expectedResult), rs);
// now compact and see if compaction still preserves the data correctness
runStatementOnDriver("alter table "+ tblName + " compact 'MAJOR'");
runWorker(hiveConf);
// create a low water mark aborted transaction and clean the older ones
createAbortLowWaterMark();
runCleaner(hiveConf); // Cleaner would remove the obsolete files.
// Verify that there is now only 1 new directory: base_xxxxxxx and the rest have have been cleaned.
FileSystem fs = FileSystem.get(hiveConf);
FileStatus[] status;
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + tblName.toString().toLowerCase()),
FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(1, status.length);
boolean sawNewBase = false;
for (int i = 0; i < status.length; i++) {
if (status[i].getPath().getName().matches("base_.*")) {
sawNewBase = true;
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(numBuckets, buckets.length);
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00000"));
}
}
Assert.assertTrue(sawNewBase);
rs = runStatementOnDriver("select * from " + tblName + " order by a");
Assert.assertEquals(Arrays.asList(expectedResult), rs);
}
protected void createAbortLowWaterMark() throws Exception{
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
runStatementOnDriver("select * from " + Table.ACIDTBL);
// wait for metastore.txn.opentxn.timeout
Thread.sleep(1000);
runInitiator(hiveConf);
}
@Test
public void testETLSplitStrategyForACID() throws Exception {
hiveConf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, "ETL");
hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, true);
runStatementOnDriver("insert into " + Table.ACIDTBL + " values(1,2)");
runStatementOnDriver("alter table " + Table.ACIDTBL + " compact 'MAJOR'");
runWorker(hiveConf);
List<String> rs = runStatementOnDriver("select * from " + Table.ACIDTBL + " where a = 1");
int[][] resultData = new int[][] {{1,2}};
Assert.assertEquals(stringifyValues(resultData), rs);
}
@Test
public void testAcidWithSchemaEvolution() throws Exception {
hiveConf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, "ETL");
String tblName = "acidTblWithSchemaEvol";
runStatementOnDriver("drop table if exists " + tblName);
runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed
" STORED AS ORC TBLPROPERTIES ('transactional'='true')");
runStatementOnDriver("INSERT INTO " + tblName + " VALUES (1, 'foo'), (2, 'bar')");
// Major compact to create a base that has ACID schema.
runStatementOnDriver("ALTER TABLE " + tblName + " COMPACT 'MAJOR'");
runWorker(hiveConf);
// Alter table for perform schema evolution.
runStatementOnDriver("ALTER TABLE " + tblName + " ADD COLUMNS(c int)");
// Validate there is an added NULL for column c.
List<String> rs = runStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a");
String[] expectedResult = { "1\tfoo\tNULL", "2\tbar\tNULL" };
Assert.assertEquals(Arrays.asList(expectedResult), rs);
}
/**
* Test that ACID works with multi-insert statement
*/
@Test
public void testMultiInsertStatement() throws Exception {
int[][] sourceValsOdd = {{5,5},{11,11}};
int[][] sourceValsEven = {{2,2}};
//populate source
runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(sourceValsOdd));
runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='even') " + makeValuesClause(sourceValsEven));
int[][] targetValsOdd = {{5,6},{7,8}};
int[][] targetValsEven = {{2,1},{4,3}};
//populate target
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " PARTITION(p='odd') " + makeValuesClause(targetValsOdd));
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " PARTITION(p='even') " + makeValuesClause(targetValsEven));
List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBLPART + " order by a,b");
int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}};
Assert.assertEquals(stringifyValues(targetVals), r);
//currently multi-insert doesn't allow same table/partition in > 1 output branch
String s = "from " + Table.ACIDTBLPART + " target right outer join " +
Table.NONACIDPART2 + " source on target.a = source.a2 " +
" INSERT INTO TABLE " + Table.ACIDTBLPART + " PARTITION(p='even') select source.a2, source.b2 where source.a2=target.a " +
" insert into table " + Table.ACIDTBLPART + " PARTITION(p='odd') select source.a2,source.b2 where target.a is null";
//r = runStatementOnDriver("explain formatted " + s);
//LOG.info("Explain formatted: " + r.toString());
runStatementOnDriver(s);
r = runStatementOnDriver("select a,b from " + Table.ACIDTBLPART + " where p='even' order by a,b");
int[][] rExpected = {{2,1},{2,2},{4,3},{5,5}};
Assert.assertEquals(stringifyValues(rExpected), r);
r = runStatementOnDriver("select a,b from " + Table.ACIDTBLPART + " where p='odd' order by a,b");
int[][] rExpected2 = {{5,6},{7,8},{11,11}};
Assert.assertEquals(stringifyValues(rExpected2), r);
}
/**
* check that we can specify insert columns
*
* Need to figure out semantics: what if a row from base expr ends up in both Update and Delete clauses we'll write
* Update event to 1 delta and Delete to another. Given that we collapse events for same current txn for different stmt ids
* to the latest one, delete will win.
* In Acid 2.0 we'll end up with 2 Delete events for the same PK. Logically should be OK, but may break Vectorized reader impl.... need to check
*
* 1:M from target to source results in ambiguous write to target - SQL Standard expects an error. (I have an argument on how
* to solve this with minor mods to Join operator written down somewhere)
*
* Only need 1 Stats task for MERGE (currently we get 1 per branch).
* Should also eliminate Move task - that's a general ACID task
*/
private void logResuts(List<String> r, String header, String prefix) {
LOG.info(prefix + " " + header);
StringBuilder sb = new StringBuilder();
int numLines = 0;
for(String line : r) {
numLines++;
sb.append(prefix).append(line).append("\n");
}
LOG.info(sb.toString());
LOG.info(prefix + " Printed " + numLines + " lines");
}
/**
* This tests that we handle non-trivial ON clause correctly
* @throws Exception
*/
@Test
public void testMerge() throws Exception {
int[][] baseValsOdd = {{5,5},{11,11}};
runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(baseValsOdd));
int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals(stringifyValues(vals), r);
String query = "merge into " + Table.ACIDTBL +
" using " + Table.NONACIDPART2 + " source ON " + Table.ACIDTBL + ".a = a2 and b + 1 = source.b2 + 1 " +
"WHEN MATCHED THEN UPDATE set b = source.b2 " +
"WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2)";
runStatementOnDriver(query);
r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
int[][] rExpected = {{2,1},{4,3},{5,5},{5,6},{7,8},{11,11}};
Assert.assertEquals(stringifyValues(rExpected), r);
}
@Test
public void testMergeWithPredicate() throws Exception {
int[][] baseValsOdd = {{2,2},{5,5},{8,8},{11,11}};
runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(baseValsOdd));
int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals(stringifyValues(vals), r);
String query = "merge into " + Table.ACIDTBL +
" t using " + Table.NONACIDPART2 + " s ON t.a = s.a2 " +
"WHEN MATCHED AND t.b between 1 and 3 THEN UPDATE set b = s.b2 " +
"WHEN NOT MATCHED and s.b2 >= 8 THEN INSERT VALUES(s.a2, s.b2)";
runStatementOnDriver(query);
r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
int[][] rExpected = {{2,2},{4,3},{5,6},{7,8},{8,8},{11,11}};
Assert.assertEquals(stringifyValues(rExpected), r);
assertUniqueID(Table.ACIDTBL);
}
/**
* Test combines update + insert clauses
* @throws Exception
*/
@Test
public void testMerge2() throws Exception {
int[][] baseValsOdd = {{5,5},{11,11}};
int[][] baseValsEven = {{2,2},{4,44}};
runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(baseValsOdd));
runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='even') " + makeValuesClause(baseValsEven));
int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals(stringifyValues(vals), r);
String query = "merge into " + Table.ACIDTBL +
" using " + Table.NONACIDPART2 + " source ON " + Table.ACIDTBL + ".a = source.a2 " +
"WHEN MATCHED THEN UPDATE set b = source.b2 " +
"WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) ";//AND b < 1
r = runStatementOnDriver(query);
//r = runStatementOnDriver("explain " + query);
//logResuts(r, "Explain logical1", "");
r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
int[][] rExpected = {{2,2},{4,44},{5,5},{7,8},{11,11}};
Assert.assertEquals(stringifyValues(rExpected), r);
assertUniqueID(Table.ACIDTBL);
}
/**
* test combines delete + insert clauses
* @throws Exception
*/
@Test
public void testMerge3() throws Exception {
int[][] baseValsOdd = {{5,5},{11,11}};
int[][] baseValsEven = {{2,2},{4,44}};
runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(baseValsOdd));
runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='even') " + makeValuesClause(baseValsEven));
int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals(stringifyValues(vals), r);
String query = "merge into " + Table.ACIDTBL +
" using " + Table.NONACIDPART2 + " source ON " + Table.ACIDTBL + ".a = source.a2 " +
"WHEN MATCHED THEN DELETE " +
"WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) ";
runStatementOnDriver(query);
r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
int[][] rExpected = {{7,8},{11,11}};
Assert.assertEquals(stringifyValues(rExpected), r);
}
@Test
public void testMultiInsert() throws Exception {
runStatementOnDriver("create temporary table if not exists data1 (x int)");
runStatementOnDriver("insert into data1 values (1),(2),(1)");
d.destroy();
d = new Driver(hiveConf);
runStatementOnDriver(" from data1 " +
"insert into " + Table.ACIDTBLPART + " partition(p) select 0, 0, 'p' || x "
+
"insert into " + Table.ACIDTBLPART + " partition(p='p1') select 0, 1");
/**
* Using {@link BucketCodec.V0} the output
* is missing 1 of the (p1,0,1) rows because they have the same ROW__ID and only differ by
* StatementId so {@link org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger} skips one.
* With split update (and V0), the data is read correctly (insert deltas are now the base) but we still
* should get duplicate ROW__IDs.
*/
List<String> r = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b");
Assert.assertEquals("[p1\t0\t0, p1\t0\t0, p1\t0\t1, p1\t0\t1, p1\t0\t1, p2\t0\t0]", r.toString());
assertUniqueID(Table.ACIDTBLPART);
/**
* this delete + select covers VectorizedOrcAcidRowBatchReader
*/
runStatementOnDriver("delete from " + Table.ACIDTBLPART);
r = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b");
Assert.assertEquals("[]", r.toString());
}
/**
* Investigating DP and WriteEntity, etc
* @throws Exception
*/
@Test
@Ignore
public void testDynamicPartitions() throws Exception {
d.destroy();
//In DbTxnManager.acquireLocks() we have
// 1 ReadEntity: default@values__tmp__table__1
// 1 WriteEntity: default@acidtblpart Type=TABLE WriteType=INSERT isDP=false
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1'),(4,4,'p2')");
List<String> r1 = runStatementOnDriver("select count(*) from " + Table.ACIDTBLPART);
Assert.assertEquals("4", r1.get(0));
//In DbTxnManager.acquireLocks() we have
// 2 ReadEntity: [default@acidtblpart@p=p1, default@acidtblpart]
// 1 WriteEntity: default@acidtblpart Type=TABLE WriteType=INSERT isDP=false
//todo: side note on the above: LockRequestBuilder combines the both default@acidtblpart entries to 1
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) select * from " + Table.ACIDTBLPART + " where p='p1'");
//In DbTxnManager.acquireLocks() we have
// 2 ReadEntity: [default@acidtblpart@p=p1, default@acidtblpart]
// 1 WriteEntity: default@acidtblpart@p=p2 Type=PARTITION WriteType=INSERT isDP=false
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p='p2') select a,b from " + Table.ACIDTBLPART + " where p='p1'");
//In UpdateDeleteSemanticAnalyzer, after super analyze
// 3 ReadEntity: [default@acidtblpart, default@acidtblpart@p=p1, default@acidtblpart@p=p2]
// 1 WriteEntity: [default@acidtblpart TABLE/INSERT]
//after UDSA
// Read [default@acidtblpart, default@acidtblpart@p=p1, default@acidtblpart@p=p2]
// Write [default@acidtblpart@p=p1, default@acidtblpart@p=p2] - PARTITION/UPDATE, PARTITION/UPDATE
//todo: Why acquire per partition locks - if you have many partitions that's hugely inefficient.
//could acquire 1 table level Shared_write intead
runStatementOnDriver("update " + Table.ACIDTBLPART + " set b = 1");
//In UpdateDeleteSemanticAnalyzer, after super analyze
// Read [default@acidtblpart, default@acidtblpart@p=p1]
// Write default@acidtblpart TABLE/INSERT
//after UDSA
// Read [default@acidtblpart, default@acidtblpart@p=p1]
// Write [default@acidtblpart@p=p1] PARTITION/UPDATE
//todo: this causes a Read lock on the whole table - clearly overkill
//for Update/Delete we always write exactly (at most actually) the partitions we read
runStatementOnDriver("update " + Table.ACIDTBLPART + " set b = 1 where p='p1'");
}
@Test
public void testDynamicPartitionsMerge() throws Exception {
d.destroy();
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1'),(4,4,'p2')");
List<String> r1 = runStatementOnDriver("select count(*) from " + Table.ACIDTBLPART);
Assert.assertEquals("4", r1.get(0));
int[][] sourceVals = {{2,15},{4,44},{5,5},{11,11}};
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(sourceVals));
runStatementOnDriver("merge into " + Table.ACIDTBLPART + " using " + Table.NONACIDORCTBL +
" as s ON " + Table.ACIDTBLPART + ".a = s.a " +
"when matched then update set b = s.b " +
"when not matched then insert values(s.a, s.b, 'new part')");
r1 = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b");
String result= r1.toString();
Assert.assertEquals("[new part\t5\t5, new part\t11\t11, p1\t1\t1, p1\t2\t15, p1\t3\t3, p2\t4\t44]", result);
//note: inserts go into 'new part'... so this won't fail
assertUniqueID(Table.ACIDTBLPART);
}
/**
* Using nested partitions and thus DummyPartition
* @throws Exception
*/
@Test
public void testDynamicPartitionsMerge2() throws Exception {
d.destroy();
int[][] targetVals = {{1,1,1},{2,2,2},{3,3,1},{4,4,2}};
runStatementOnDriver("insert into " + Table.ACIDNESTEDPART + " partition(p=1,q) " + makeValuesClause(targetVals));
List<String> r1 = runStatementOnDriver("select count(*) from " + Table.ACIDNESTEDPART);
Assert.assertEquals("4", r1.get(0));
int[][] sourceVals = {{2,15},{4,44},{5,5},{11,11}};
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(sourceVals));
runStatementOnDriver("merge into " + Table.ACIDNESTEDPART + " using " + Table.NONACIDORCTBL +
" as s ON " + Table.ACIDNESTEDPART + ".a = s.a " +
"when matched then update set b = s.b " +
"when not matched then insert values(s.a, s.b, 3,4)");
r1 = runStatementOnDriver("select p,q,a,b from " + Table.ACIDNESTEDPART + " order by p,q, a, b");
Assert.assertEquals(stringifyValues(new int[][] {{1,1,1,1},{1,1,3,3},{1,2,2,15},{1,2,4,44},{3,4,5,5},{3,4,11,11}}), r1);
//insert of merge lands in part (3,4) - no updates land there
assertUniqueID(Table.ACIDNESTEDPART);
}
@Ignore("Covered elsewhere")
@Test
public void testMergeAliasedTarget() throws Exception {
int[][] baseValsOdd = {{2,2},{4,44},{5,5},{11,11}};
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(baseValsOdd));
int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
String query = "merge into " + Table.ACIDTBL +
" as target using " + Table.NONACIDORCTBL + " source ON target.a = source.a " +
"WHEN MATCHED THEN update set b = 0 " +
"WHEN NOT MATCHED THEN INSERT VALUES(source.a, source.b) ";
runStatementOnDriver(query);
List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
int[][] rExpected = {{2,0},{4,0},{5,0},{7,8},{11,11}};
Assert.assertEquals(stringifyValues(rExpected), r);
}
@Test
@Ignore("Values clause with table constructor not yet supported")
public void testValuesSource() throws Exception {
int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}};
runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(targetVals));
String query = "merge into " + Table.ACIDTBL +
" as t using (select * from (values (2,2),(4,44),(5,5),(11,11)) as F(a,b)) s ON t.a = s.a " +
"WHEN MATCHED and s.a < 5 THEN DELETE " +
"WHEN MATCHED AND s.a < 3 THEN update set b = 0 " +
"WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";
runStatementOnDriver(query);
List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
int[][] rExpected = {{5,6},{7,8},{11,11}};
Assert.assertEquals(stringifyValues(rExpected), r);
}
@Test
public void testBucketCodec() throws Exception {
d.destroy();
//insert data in "legacy" format
hiveConf.setIntVar(HiveConf.ConfVars.TESTMODE_BUCKET_CODEC_VERSION, 0);
d = new Driver(hiveConf);
int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}};
runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(targetVals));
d.destroy();
hiveConf.setIntVar(HiveConf.ConfVars.TESTMODE_BUCKET_CODEC_VERSION, 1);
d = new Driver(hiveConf);
//do some operations with new format
runStatementOnDriver("update " + Table.ACIDTBL + " set b=11 where a in (5,7)");
runStatementOnDriver("insert into " + Table.ACIDTBL + " values(11,11)");
runStatementOnDriver("delete from " + Table.ACIDTBL + " where a = 7");
//make sure we get the right data back before/after compactions
List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
int[][] rExpected = {{2,1},{4,3},{5,11},{11,11}};
Assert.assertEquals(stringifyValues(rExpected), r);
runStatementOnDriver("ALTER TABLE " + Table.ACIDTBL + " COMPACT 'MINOR'");
runWorker(hiveConf);
r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals(stringifyValues(rExpected), r);
runStatementOnDriver("ALTER TABLE " + Table.ACIDTBL + " COMPACT 'MAJOR'");
runWorker(hiveConf);
r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals(stringifyValues(rExpected), r);
}
/**
* Test the scenario when IOW comes in before a MAJOR compaction happens
* @throws Exception
*/
@Test
public void testInsertOverwrite1() throws Exception {
FileSystem fs = FileSystem.get(hiveConf);
FileStatus[] status;
// 1. Insert two rows to an ACID table
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,4)");
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.ACIDTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
// There should be 2 delta dirs in the location
Assert.assertEquals(2, status.length);
for (int i = 0; i < status.length; i++) {
Assert.assertTrue(status[i].getPath().getName().matches("delta_.*"));
}
// 2. INSERT OVERWRITE
// Prepare data for the source table
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(5,6),(7,8)");
// Insert overwrite ACID table from source table
runStatementOnDriver("insert overwrite table " + Table.ACIDTBL + " select a,b from " + Table.NONACIDORCTBL);
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.ACIDTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
// There should be 2 delta dirs, plus a base dir in the location
Assert.assertEquals(3, status.length);
boolean sawBase = false;
String baseDir = "";
int deltaCount = 0;
for (int i = 0; i < status.length; i++) {
String dirName = status[i].getPath().getName();
if (dirName.matches("delta_.*")) {
deltaCount++;
} else {
sawBase = true;
baseDir = dirName;
Assert.assertTrue(baseDir.matches("base_.*"));
}
}
Assert.assertEquals(2, deltaCount);
Assert.assertTrue(sawBase);
// Verify query result
List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a");
int [][] resultData = new int[][] {{5,6},{7,8}};
Assert.assertEquals(stringifyValues(resultData), rs);
// 3. Perform a major compaction. Nothing should change. Both deltas and base dirs should have the same name.
// Re-verify directory layout and query result by using the same logic as above
runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
runWorker(hiveConf);
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.ACIDTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
// There should be 2 delta dirs, plus a base dir in the location
Assert.assertEquals(3, status.length);
sawBase = false;
deltaCount = 0;
for (int i = 0; i < status.length; i++) {
String dirName = status[i].getPath().getName();
if (dirName.matches("delta_.*")) {
deltaCount++;
} else {
sawBase = true;
Assert.assertTrue(dirName.matches("base_.*"));
Assert.assertEquals(baseDir, dirName);
}
}
Assert.assertEquals(2, deltaCount);
Assert.assertTrue(sawBase);
// Verify query result
rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a");
Assert.assertEquals(stringifyValues(resultData), rs);
// 4. Run Cleaner. It should remove the 2 delta dirs.
runCleaner(hiveConf);
// There should be only 1 directory left: base_xxxxxxx.
// The delta dirs should have been cleaned up.
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.ACIDTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(1, status.length);
Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
Assert.assertEquals(baseDir, status[0].getPath().getName());
// Verify query result
rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a");
Assert.assertEquals(stringifyValues(resultData), rs);
}
/**
* Test the scenario when IOW comes in after a MAJOR compaction happens
* @throws Exception
*/
@Test
public void testInsertOverwrite2() throws Exception {
FileSystem fs = FileSystem.get(hiveConf);
FileStatus[] status;
// 1. Insert two rows to an ACID table
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,4)");
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.ACIDTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
// There should be 2 delta dirs in the location
Assert.assertEquals(2, status.length);
for (int i = 0; i < status.length; i++) {
Assert.assertTrue(status[i].getPath().getName().matches("delta_.*"));
}
// 2. Perform a major compaction. There should be an extra base dir now.
runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
runWorker(hiveConf);
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.ACIDTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
// There should be 2 delta dirs, plus a base dir in the location
Assert.assertEquals(3, status.length);
boolean sawBase = false;
int deltaCount = 0;
for (int i = 0; i < status.length; i++) {
String dirName = status[i].getPath().getName();
if (dirName.matches("delta_.*")) {
deltaCount++;
} else {
sawBase = true;
Assert.assertTrue(dirName.matches("base_.*"));
}
}
Assert.assertEquals(2, deltaCount);
Assert.assertTrue(sawBase);
// Verify query result
int [][] resultData = new int[][] {{1,2},{3,4}};
List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a");
Assert.assertEquals(stringifyValues(resultData), rs);
// 3. INSERT OVERWRITE
// Prepare data for the source table
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(5,6),(7,8)");
// Insert overwrite ACID table from source table
runStatementOnDriver("insert overwrite table " + Table.ACIDTBL + " select a,b from " + Table.NONACIDORCTBL);
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.ACIDTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
// There should be 2 delta dirs, plus 2 base dirs in the location
Assert.assertEquals(4, status.length);
int baseCount = 0;
deltaCount = 0;
for (int i = 0; i < status.length; i++) {
String dirName = status[i].getPath().getName();
if (dirName.matches("delta_.*")) {
deltaCount++;
} else {
baseCount++;
}
}
Assert.assertEquals(2, deltaCount);
Assert.assertEquals(2, baseCount);
// Verify query result
resultData = new int[][] {{5,6},{7,8}};
rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a");
Assert.assertEquals(stringifyValues(resultData), rs);
// 4. Perform another major compaction. Nothing should change. Both deltas and both base dirs
// should have the same name.
// Re-verify directory layout and query result by using the same logic as above
runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
runWorker(hiveConf);
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.ACIDTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
// There should be 2 delta dirs, plus 2 base dirs in the location
Assert.assertEquals(4, status.length);
baseCount = 0;
deltaCount = 0;
for (int i = 0; i < status.length; i++) {
String dirName = status[i].getPath().getName();
if (dirName.matches("delta_.*")) {
deltaCount++;
} else {
Assert.assertTrue(dirName.matches("base_.*"));
baseCount++;
}
}
Assert.assertEquals(2, deltaCount);
Assert.assertEquals(2, baseCount);
// Verify query result
rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a");
Assert.assertEquals(stringifyValues(resultData), rs);
// 5. Run Cleaner. It should remove the 2 delta dirs and 1 old base dir.
runCleaner(hiveConf);
// There should be only 1 directory left: base_xxxxxxx.
// The delta dirs should have been cleaned up.
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.ACIDTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(1, status.length);
Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
// Verify query result
rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a");
Assert.assertEquals(stringifyValues(resultData), rs);
}
/**
* Create a table with schema evolution, and verify that no data is lost during (MR major)
* compaction.
*
* @throws Exception if a query fails
*/
@Test
public void testSchemaEvolutionCompaction() throws Exception {
String tblName = "schemaevolutioncompaction";
runStatementOnDriver("drop table if exists " + tblName);
runStatementOnDriver("CREATE TABLE " + tblName + "(a INT) " +
" PARTITIONED BY(part string)" +
" STORED AS ORC TBLPROPERTIES ('transactional'='true')");
// First INSERT round.
runStatementOnDriver("insert into " + tblName + " partition (part='aa') values (1)");
// ALTER TABLE ... ADD COLUMNS
runStatementOnDriver("ALTER TABLE " + tblName + " ADD COLUMNS(b int)");
// Second INSERT round.
runStatementOnDriver("insert into " + tblName + " partition (part='aa') values (2, 2000)");
// Validate data
List<String> res = runStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a");
Assert.assertEquals(2, res.size());
Assert.assertEquals("1\tNULL\taa", res.get(0));
Assert.assertEquals("2\t2000\taa", res.get(1));
// Compact
CompactionRequest compactionRequest =
new CompactionRequest("default", tblName, CompactionType.MAJOR);
compactionRequest.setPartitionname("part=aa");
txnHandler.compact(compactionRequest);
runWorker(hiveConf);
runCleaner(hiveConf);
// Verify successful compaction
List<ShowCompactResponseElement> compacts =
txnHandler.showCompact(new ShowCompactRequest()).getCompacts();
Assert.assertEquals(1, compacts.size());
Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, compacts.get(0).getState());
// Validate data after compaction
res = runStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a");
Assert.assertEquals(2, res.size());
Assert.assertEquals("1\tNULL\taa", res.get(0));
Assert.assertEquals("2\t2000\taa", res.get(1));
}
/**
* Test cleaner for TXN_TO_WRITE_ID table.
* @throws Exception
*/
@Test
public void testCleanerForTxnToWriteId() throws Exception {
int[][] tableData1 = {{1, 2}};
int[][] tableData2 = {{2, 3}};
int[][] tableData3 = {{3, 4}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData1));
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2));
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData3));
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=1) (a,b) " + makeValuesClause(tableData1));
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=2) (a,b) " + makeValuesClause(tableData2));
// All inserts are committed and hence would expect in TXN_TO_WRITE_ID, 3 entries for acidTbl
// and 2 entries for acidTblPart as each insert would have allocated a writeid.
String acidTblWhereClause = " where t2w_database = " + quoteString("default")
+ " and t2w_table = " + quoteString(Table.ACIDTBL.name().toLowerCase());
String acidTblPartWhereClause = " where t2w_database = " + quoteString("default")
+ " and t2w_table = " + quoteString(Table.ACIDTBLPART.name().toLowerCase());
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblWhereClause),
3, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblWhereClause));
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblPartWhereClause),
2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblPartWhereClause));
txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MAJOR));
runWorker(hiveConf);
runCleaner(hiveConf);
txnHandler.cleanTxnToWriteIdTable();
// After compaction/cleanup, all entries from TXN_TO_WRITE_ID should be cleaned up as all txns are committed.
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"),
0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID"));
// Following sequence of commit-abort-open-abort-commit.
int[][] tableData4 = {{4, 5}};
int[][] tableData5 = {{5, 6}};
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=3) (a,b) " + makeValuesClause(tableData3));
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData4));
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
// Keep an open txn which refers to the aborted txn.
Context ctx = new Context(hiveConf);
HiveTxnManager txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(hiveConf);
txnMgr.openTxn(ctx, "u1");
txnMgr.getValidTxns();
// Start an INSERT statement transaction and roll back this transaction.
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData5));
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData5));
// We would expect 4 entries in TXN_TO_WRITE_ID as each insert would have allocated a writeid
// including aborted one.
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblWhereClause),
3, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblWhereClause));
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblPartWhereClause),
1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblPartWhereClause));
// The entry relevant to aborted txns shouldn't be removed from TXN_TO_WRITE_ID as
// aborted txn would be removed from TXNS only after the compaction. Also, committed txn > open txn is retained.
// As open txn doesn't allocate writeid, the 2 entries for aborted and committed should be retained.
txnHandler.cleanEmptyAbortedAndCommittedTxns();
txnHandler.cleanTxnToWriteIdTable();
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblWhereClause),
3, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblWhereClause));
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblPartWhereClause),
0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblPartWhereClause));
// The cleaner will removed aborted txns data/metadata but cannot remove aborted txn2 from TXN_TO_WRITE_ID
// as there is a open txn < aborted txn2. The aborted txn1 < open txn and will be removed.
// Also, committed txn > open txn is retained.
txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MAJOR));
runWorker(hiveConf);
runCleaner(hiveConf);
txnHandler.cleanEmptyAbortedAndCommittedTxns();
txnHandler.cleanTxnToWriteIdTable();
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"),
2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID"));
// Commit the open txn, which lets the cleanup on TXN_TO_WRITE_ID.
txnMgr.commitTxn();
txnHandler.cleanTxnToWriteIdTable();
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"),
0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID"));
}
private void verifyDirAndResult(int expectedDeltas) throws Exception {
FileSystem fs = FileSystem.get(hiveConf);
// Verify the content of subdirs
FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.MMTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
int sawDeltaTimes = 0;
for (int i = 0; i < status.length; i++) {
Assert.assertTrue(status[i].getPath().getName().matches("delta_.*"));
sawDeltaTimes++;
FileStatus[] files = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(1, files.length);
Assert.assertTrue(files[0].getPath().getName().equals("000000_0"));
}
Assert.assertEquals(expectedDeltas, sawDeltaTimes);
// Verify query result
int [][] resultData = new int[][] {{1,2}, {3,4}};
List<String> rs = runStatementOnDriver("select a,b from " + Table.MMTBL);
Assert.assertEquals(stringifyValues(resultData), rs);
}
@Test
public void testAcidOrcWritePreservesFieldNames() throws Exception {
// with vectorization
String tableName = "acidorcwritefieldnames";
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
runStatementOnDriver("DROP TABLE IF EXISTS " + tableName);
runStatementOnDriver("CREATE TABLE " + tableName + " (a INT, b STRING) CLUSTERED BY (a) INTO " + BUCKET_COUNT + " BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')");
runStatementOnDriver("INSERT INTO " + tableName + " VALUES (1, 'foo'), (2, 'bar')");
tableName = "acidorcwritefieldnames_complex";
runStatementOnDriver("DROP TABLE IF EXISTS " + tableName);
runStatementOnDriver("CREATE TABLE " + tableName + " (a INT, b STRING, s STRUCT<c:int, si:STRUCT<d:double," +
"e:float>>) CLUSTERED BY (a) INTO " + BUCKET_COUNT +
" BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')");
runStatementOnDriver("INSERT INTO " + tableName + " select a, b, named_struct('c',10,'si'," +
"named_struct('d',cast(1.0 as double),'e',cast(2.0 as float))) from acidorcwritefieldnames");
FileSystem fs = FileSystem.get(hiveConf);
FileStatus[] fileStatuses = fs.globStatus(new Path(TEST_WAREHOUSE_DIR + "/" + tableName + "/" + AcidUtils.DELTA_PREFIX + "*/" + AcidUtils.BUCKET_PREFIX + "*"));
Assert.assertEquals(BUCKET_COUNT, fileStatuses.length);
OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(hiveConf);
for (FileStatus fileStatus : fileStatuses) {
Reader r = OrcFile.createReader(fileStatus.getPath(), readerOptions);
TypeDescription rowSchema = r.getSchema().getChildren().get(5);
Assert.assertEquals("struct<a:int,b:string,s:struct<c:int,si:struct<d:double,e:float>>>", rowSchema.toString());
}
// without vectorization
tableName = "acidorcwritefieldnames";
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
runStatementOnDriver("DROP TABLE IF EXISTS " + tableName);
runStatementOnDriver("CREATE TABLE " + tableName + " (a INT, b STRING) CLUSTERED BY (a) INTO " + BUCKET_COUNT + " BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')");
runStatementOnDriver("INSERT INTO " + tableName + " VALUES (1, 'foo'), (2, 'bar')");
tableName = "acidorcwritefieldnames_complex";
runStatementOnDriver("DROP TABLE IF EXISTS " + tableName);
runStatementOnDriver("CREATE TABLE " + tableName + " (a INT, b STRING, s STRUCT<c:int, si:STRUCT<d:double," +
"e:float>>) CLUSTERED BY (a) INTO " + BUCKET_COUNT +
" BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')");
runStatementOnDriver("INSERT INTO " + tableName + " select a, b, named_struct('c',10,'si'," +
"named_struct('d',cast(1.0 as double),'e',cast(2.0 as float))) from acidorcwritefieldnames");
fs = FileSystem.get(hiveConf);
fileStatuses = fs.globStatus(new Path(TEST_WAREHOUSE_DIR + "/" + tableName + "/" + AcidUtils.DELTA_PREFIX + "*/" + AcidUtils.BUCKET_PREFIX + "*"));
Assert.assertEquals(BUCKET_COUNT, fileStatuses.length);
readerOptions = OrcFile.readerOptions(hiveConf);
for (FileStatus fileStatus : fileStatuses) {
Reader r = OrcFile.createReader(fileStatus.getPath(), readerOptions);
TypeDescription rowSchema = r.getSchema().getChildren().get(5);
Assert.assertEquals("struct<a:int,b:string,s:struct<c:int,si:struct<d:double,e:float>>>", rowSchema.toString());
}
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
}
/**
* takes raw data and turns it into a string as if from Driver.getResults()
* sorts rows in dictionary order
*/
static List<String> stringifyValues(int[][] rowsIn) {
return TxnCommandsBaseForTests.stringifyValues(rowsIn);
}
/**
* This tests that delete_delta_x_y dirs will be not produced during minor compaction if no input delete events.
* See HIVE-20941.
* @throws Exception
*/
@Test
public void testDeleteEventsCompaction() throws Exception {
int[][] tableData1 = {{1, 2}};
int[][] tableData2 = {{2, 3}};
int[][] tableData3 = {{3, 4}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData1));
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2));
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData3));
txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MINOR));
runWorker(hiveConf);
runCleaner(hiveConf);
FileSystem fs = FileSystem.get(hiveConf);
FileStatus[] fileStatuses = fs.globStatus(new Path(TEST_WAREHOUSE_DIR + "/" + Table.ACIDTBL.name().toLowerCase() + "/*"));
for(FileStatus fileStatus : fileStatuses) {
Assert.assertFalse(fileStatus.getPath().getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX));
}
}
/**
* takes raw data and turns it into a string as if from Driver.getResults()
* sorts rows in dictionary order
*/
static List<String> stringifyValuesNoSort(int[][] rowsIn) {
assert rowsIn.length > 0;
int[][] rows = rowsIn.clone();
List<String> rs = new ArrayList<String>();
for(int[] row : rows) {
assert row.length > 0;
StringBuilder sb = new StringBuilder();
for(int value : row) {
sb.append(value).append("\t");
}
sb.setLength(sb.length() - 1);
rs.add(sb.toString());
}
return rs;
}
static String makeValuesClause(int[][] rows) {
assert rows.length > 0;
StringBuilder sb = new StringBuilder(" values");
for(int[] row : rows) {
assert row.length > 0;
if(row.length > 1) {
sb.append("(");
}
for(int value : row) {
sb.append(value).append(",");
}
sb.setLength(sb.length() - 1);//remove trailing comma
if(row.length > 1) {
sb.append(")");
}
sb.append(",");
}
sb.setLength(sb.length() - 1);//remove trailing comma
return sb.toString();
}
protected List<String> runStatementOnDriver(String stmt) throws Exception {
LOG.info("+runStatementOnDriver(" + stmt + ")");
try {
d.run(stmt);
} catch (CommandProcessorException e) {
throw new RuntimeException(stmt + " failed: " + e);
}
List<String> rs = new ArrayList<String>();
d.getResults(rs);
return rs;
}
final void assertUniqueID(Table table) throws Exception {
String partCols = table.getPartitionColumns();
//check to make sure there are no duplicate ROW__IDs - HIVE-16832
StringBuilder sb = new StringBuilder("select ");
if(partCols != null && partCols.length() > 0) {
sb.append(partCols).append(",");
}
sb.append(" ROW__ID, count(*) from ").append(table).append(" group by ");
if(partCols != null && partCols.length() > 0) {
sb.append(partCols).append(",");
}
sb.append("ROW__ID having count(*) > 1");
List<String> r = runStatementOnDriver(sb.toString());
Assert.assertTrue("Duplicate ROW__ID: " + r.toString(),r.size() == 0);
}
static String quoteString(String input) {
return "'" + input + "'";
}
}