blob: 4d25f88da5434a3cdf28ab1d6b8fdc4637cf02df [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql;
import java.io.File;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests here are for micro-managed tables:
* specifically INSERT OVERWRITE statements and Major/Minor Compactions.
*/
public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests {
static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommandsForMmTable.class);
protected static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
File.separator + TestTxnCommands.class.getCanonicalName()
+ "-" + System.currentTimeMillis()
).getPath().replaceAll("\\\\", "/");
protected static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
@Override
protected String getTestDataDir() {
return TEST_DATA_DIR;
}
enum TableExtended {
NONACIDPART("nonAcidPart", "p"),
MMTBL("mmTbl"),
MMTBL2("mmTbl2"),
MMTBLPART("mmTblPart","p");
final String name;
final String partitionColumns;
@Override
public String toString() {
return name;
}
String getPartitionColumns() {
return partitionColumns;
}
TableExtended(String name) {
this(name, null);
}
TableExtended(String name, String partitionColumns) {
this.name = name;
this.partitionColumns = partitionColumns;
}
}
@Override
@Before
public void setUp() throws Exception {
super.setUpInternal();
setUpInternalExtended(false);
}
void setUpInternalExtended(boolean isOrcFormat) throws Exception {
hiveConf.setBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING, true);
hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "true");
hiveConf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
runStatementOnDriver("create table " + TableExtended.NONACIDPART + "(a int, b int) partitioned by (p string) stored as orc TBLPROPERTIES ('transactional'='false')");
if (!isOrcFormat) {
runStatementOnDriver("create table " + TableExtended.MMTBL + "(a int, b int) TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')");
runStatementOnDriver("create table " + TableExtended.MMTBL2 + "(a int, b int) TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')");
runStatementOnDriver("create table " + TableExtended.MMTBLPART + "(a int, b int) partitioned by (p string) TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')");
} else {
runStatementOnDriver("create table " + TableExtended.MMTBL + "(a int, b int) stored as orc TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')");
runStatementOnDriver("create table " + TableExtended.MMTBL2 + "(a int, b int) stored as orc TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')");
runStatementOnDriver("create table " + TableExtended.MMTBLPART + "(a int, b int) partitioned by (p string) stored as orc TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')");
}
}
protected void dropTables() throws Exception {
super.dropTables();
for(TestTxnCommandsForMmTable.TableExtended t : TestTxnCommandsForMmTable.TableExtended.values()) {
runStatementOnDriver("drop table if exists " + t);
}
}
/**
* Test a scenario, on a micro-managed table, where an IOW comes in
* after a MAJOR compaction, and then a MINOR compaction is initiated.
*
* @throws Exception
*/
@Test
public void testInsertOverwriteForMmTable() throws Exception {
FileSystem fs = FileSystem.get(hiveConf);
FileStatus[] status;
// 1. Insert two rows to an MM table
runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(1,2)");
runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(3,4)");
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
// There should be 2 delta dirs in the location
Assert.assertEquals(2, status.length);
for (int i = 0; i < status.length; i++) {
Assert.assertTrue(status[i].getPath().getName().matches("delta_.*"));
}
// 2. INSERT OVERWRITE
// Prepare data for the source table
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(5,6),(7,8)");
// Insert overwrite MM table from source table
runStatementOnDriver("insert overwrite table " + TableExtended.MMTBL + " select a,b from " + Table.NONACIDORCTBL);
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
// There should be 2 delta dirs, plus 1 base dir in the location
Assert.assertEquals(3, status.length);
int baseCount = 0;
int deltaCount = 0;
for (int i = 0; i < status.length; i++) {
String dirName = status[i].getPath().getName();
if (dirName.matches("delta_.*")) {
deltaCount++;
} else {
baseCount++;
}
}
Assert.assertEquals(2, deltaCount);
Assert.assertEquals(1, baseCount);
// Verify query result
int[][] resultData = new int[][] {{5,6},{7,8}};
List<String> rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
Assert.assertEquals(stringifyValues(resultData), rs);
}
/**
* Test a scenario, on a partitioned micro-managed table, that an IOW comes in
* before a MAJOR compaction happens.
*
* @throws Exception
*/
@Test
public void testInsertOverwriteForPartitionedMmTable() throws Exception {
FileSystem fs = FileSystem.get(hiveConf);
FileStatus[] status;
// 1. Insert two rows to a partitioned MM table.
int[][] valuesOdd = {{5,6},{7,8}};
int[][] valuesEven = {{2,1},{4,3}};
runStatementOnDriver("insert into " + TableExtended.MMTBLPART + " PARTITION(p='odd') " + makeValuesClause(valuesOdd));
runStatementOnDriver("insert into " + TableExtended.MMTBLPART + " PARTITION(p='even') " + makeValuesClause(valuesEven));
// Verify dirs
String[] pStrings = {"/p=odd", "/p=even"};
for(int i=0; i < pStrings.length; i++) {
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(TableExtended.MMTBLPART).toString().toLowerCase() + pStrings[i]), FileUtils.STAGING_DIR_PATH_FILTER);
// There should be 1 delta dir per partition location
Assert.assertEquals(1, status.length);
Assert.assertTrue(status[0].getPath().getName().matches("delta_.*"));
}
// 2. INSERT OVERWRITE
// Prepare data for the source table
int[][] newValsOdd = {{5,5},{11,11}};
int[][] newValsEven = {{2,2}};
runStatementOnDriver("insert into " + TableExtended.NONACIDPART + " PARTITION(p='odd') " + makeValuesClause(newValsOdd));
runStatementOnDriver("insert into " + TableExtended.NONACIDPART + " PARTITION(p='even') " + makeValuesClause(newValsEven));
// Insert overwrite MM table from source table
List<String> rs = null;
String s = "insert overwrite table " + TableExtended.MMTBLPART + " PARTITION(p='odd') " +
" select a,b from " + TableExtended.NONACIDPART + " where " + TableExtended.NONACIDPART + ".p='odd'";
rs = runStatementOnDriver("explain formatted " + s);
LOG.info("Explain formatted: " + rs.toString());
runStatementOnDriver(s);
s = "insert overwrite table " + TableExtended.MMTBLPART + " PARTITION(p='even') " +
" select a,b from " + TableExtended.NONACIDPART + " where " + TableExtended.NONACIDPART + ".p='even'";
runStatementOnDriver(s);
// Verify resulting dirs.
boolean sawBase = false;
String[] baseDirs = {"", ""};
int deltaCount = 0;
for(int h=0; h < pStrings.length; h++) {
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(TableExtended.MMTBLPART).toString().toLowerCase() + pStrings[h]), FileUtils.STAGING_DIR_PATH_FILTER);
// There should be 1 delta dir, plus a base dir in the location
Assert.assertEquals(2, status.length);
for (int i = 0; i < status.length; i++) {
String dirName = status[i].getPath().getName();
if (dirName.matches("delta_.*")) {
deltaCount++;
} else {
sawBase = true;
baseDirs[h] = dirName;
Assert.assertTrue(baseDirs[h].matches("base_.*"));
}
}
Assert.assertEquals(1, deltaCount);
Assert.assertTrue(sawBase);
deltaCount = 0;
sawBase = false;
}
// Verify query result
rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBLPART + " where p='even' order by a,b");
int [][] rExpectedEven = new int[][] {{2,2}};
Assert.assertEquals(stringifyValues(rExpectedEven), rs);
rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBLPART + " where p='odd' order by a,b");
int [][] rExpectedOdd = new int[][] {{5,5},{11,11}};
Assert.assertEquals(stringifyValues(rExpectedOdd), rs);
// 3. Perform a major compaction. Nothing should change.
// Both deltas and base dirs should have the same name.
// Re-verify directory layout and query result by using the same logic as above
runStatementOnDriver("alter table "+ TableExtended.MMTBLPART + " PARTITION(p='odd') " + " compact 'MAJOR'" );
runWorker(hiveConf);
runStatementOnDriver("alter table "+ TableExtended.MMTBLPART + " PARTITION(p='even') " + " compact 'MAJOR'" );
runWorker(hiveConf);
for(int h=0; h < pStrings.length; h++) {
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(TableExtended.MMTBLPART).toString().toLowerCase() + pStrings[h]), FileUtils.STAGING_DIR_PATH_FILTER);
// There should be 2 delta dirs, plus a base dir in the location
Assert.assertEquals(2, status.length);
sawBase = false;
deltaCount = 0;
for (int i = 0; i < status.length; i++) {
String dirName = status[i].getPath().getName();
if (dirName.matches("delta_.*")) {
deltaCount++;
} else {
sawBase = true;
Assert.assertTrue("BASE ERROR: " + dirName, dirName.matches("base_.*"));
Assert.assertEquals(baseDirs[h], dirName);
}
}
Assert.assertEquals(1, deltaCount);
Assert.assertTrue(sawBase);
deltaCount = 0;
sawBase = false;
}
// Verify query result
rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBLPART + " order by a,b");
int[][] rExpected = new int[][] {{2,2},{5,5},{11,11}};
Assert.assertEquals(stringifyValues(rExpected), rs);
// 4. Run Cleaner. It should remove the 2 delta dirs.
runCleaner(hiveConf);
// There should be only 1 directory left: base_xxxxxxx.
// The delta dirs should have been cleaned up.
for(int h=0; h < pStrings.length; h++) {
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(TableExtended.MMTBLPART).toString().toLowerCase() + pStrings[h]), FileUtils.STAGING_DIR_PATH_FILTER);
Assert.assertEquals(1, status.length);
Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
Assert.assertEquals(baseDirs[h], status[0].getPath().getName());
}
// Verify query result
rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBLPART + " order by a,b");
Assert.assertEquals(stringifyValues(rExpected), rs);
}
/**
* Test a scenario, on a dynamically partitioned micro-managed table, that an IOW comes in
* before a MAJOR compaction happens.
*
* @throws Exception
*/
@Test
public void testInsertOverwriteWithDynamicPartition() throws Exception {
FileSystem fs = FileSystem.get(hiveConf);
FileStatus[] status;
// 1. Insert two rows to a partitioned MM table.
int[][] valuesOdd = {{5,6},{7,8}};
int[][] valuesEven = {{2,1},{4,3}};
runStatementOnDriver("insert into " + TableExtended.MMTBLPART + " PARTITION(p='odd') " + makeValuesClause(valuesOdd));
runStatementOnDriver("insert into " + TableExtended.MMTBLPART + " PARTITION(p='even') " + makeValuesClause(valuesEven));
// Verify dirs
String[] pStrings = {"/p=odd", "/p=even"};
for(int i=0; i < pStrings.length; i++) {
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(TableExtended.MMTBLPART).toString().toLowerCase() + pStrings[i]), FileUtils.STAGING_DIR_PATH_FILTER);
// There should be 1 delta dir per partition location
Assert.assertEquals(1, status.length);
Assert.assertTrue(status[0].getPath().getName().matches("delta_.*"));
}
// 2. INSERT OVERWRITE
// Prepare data for the source table
int[][] newValsOdd = {{5,5},{11,11}};
int[][] newValsEven = {{2,2}};
runStatementOnDriver("insert into " + TableExtended.NONACIDPART + " PARTITION(p='odd') " + makeValuesClause(newValsOdd));
runStatementOnDriver("insert into " + TableExtended.NONACIDPART + " PARTITION(p='even') " + makeValuesClause(newValsEven));
runStatementOnDriver("insert overwrite table " + TableExtended.MMTBLPART + " partition(p) select a,b,p from " + TableExtended.NONACIDPART);
// Verify resulting dirs.
boolean sawBase = false;
String[] baseDirs = {"", ""};
int deltaCount = 0;
for(int h=0; h < pStrings.length; h++) {
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(TableExtended.MMTBLPART).toString().toLowerCase() + pStrings[h]), FileUtils.STAGING_DIR_PATH_FILTER);
// There should be 1 delta dir, plus a base dir in the location
Assert.assertEquals(2, status.length); // steve
for (int i = 0; i < status.length; i++) {
String dirName = status[i].getPath().getName();
if (dirName.matches("delta_.*")) {
deltaCount++;
} else {
sawBase = true;
baseDirs[h] = dirName;
Assert.assertTrue(baseDirs[h].matches("base_.*"));
}
}
Assert.assertEquals(1, deltaCount);
Assert.assertTrue(sawBase);
deltaCount = 0;
sawBase = false;
}
// Verify query result
List<String> rs = null;
rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBLPART + " where p='even' order by a,b");
int [][] rExpectedEven = new int[][] {{2,2}};
Assert.assertEquals(stringifyValues(rExpectedEven), rs);
rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBLPART + " where p='odd' order by a,b");
int [][] rExpectedOdd = new int[][] {{5,5},{11,11}};
Assert.assertEquals(stringifyValues(rExpectedOdd), rs);
// Verify query result
rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBLPART + " order by a,b");
int[][] rExpected = new int[][] {{2,2},{5,5},{11,11}};
Assert.assertEquals(stringifyValues(rExpected), rs);
}
@Test
public void testInsertOverwriteWithUnionAll() throws Exception {
FileSystem fs = FileSystem.get(hiveConf);
FileStatus[] status;
// 1. Insert two rows to an MM table
runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(1,2)");
runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(3,4)");
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
// There should be 2 delta dirs in the location
Assert.assertEquals(2, status.length);
for (int i = 0; i < status.length; i++) {
Assert.assertTrue(status[i].getPath().getName().matches("delta_.*"));
}
// 2. Insert Overwrite.
int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}};
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + TestTxnCommands2.makeValuesClause(values));
runStatementOnDriver("insert overwrite table " + TableExtended.MMTBL + " select a,b from " + Table.NONACIDORCTBL + " where a between 1 and 3 union all select a,b from " + Table.NONACIDORCTBL + " where a between 5 and 7");
// Verify resulting dirs.
boolean sawBase = false;
String baseDir = "";
int deltaCount = 0;
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
// There should be 2 delta dirs, plus a base dir in the location
Assert.assertEquals(3, status.length);
for (int i = 0; i < status.length; i++) {
String dirName = status[i].getPath().getName();
if (dirName.matches("delta_.*")) {
deltaCount++;
} else {
sawBase = true;
baseDir = dirName;
Assert.assertTrue(baseDir.matches("base_.*"));
}
}
Assert.assertEquals(2, deltaCount);
Assert.assertTrue(sawBase);
List<String> rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
int[][] rExpected = new int[][] {{1,2},{2,4},{5,6},{6,8}};
Assert.assertEquals(stringifyValues(rExpected), rs);
// 4. Perform a major compaction.
runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MAJOR'");
runWorker(hiveConf);
// 5. Run Cleaner. It should remove the 2 delta dirs.
runCleaner(hiveConf);
rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
Assert.assertEquals(stringifyValues(rExpected), rs);
// Verify resulting dirs.
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
// There should be one base dir in the location
Assert.assertEquals(1, status.length);
sawBase = false;
deltaCount = 0;
for (int i = 0; i < status.length; i++) {
String dirName = status[i].getPath().getName();
if (dirName.matches("delta_.*")) {
deltaCount++;
} else {
sawBase = true;
baseDir = dirName;
Assert.assertTrue(baseDir.matches("base_.*"));
}
}
Assert.assertEquals(0, deltaCount);
Assert.assertTrue(sawBase);
rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
Assert.assertEquals(stringifyValues(rExpected), rs);
}
@Test
public void testOperationsOnCompletedTxnComponentsForMmTable() throws Exception {
// Insert two rows into the table.
runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(1,2)");
runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(3,4)");
// There should be 2 delta directories
verifyDirAndResult(2);
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"),
2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPLETED_TXN_COMPONENTS"));
// Initiate a major compaction request on the table.
runStatementOnDriver("alter table " + TableExtended.MMTBL + " compact 'MAJOR'");
// Run worker.
runWorker(hiveConf);
verifyDirAndResult(2, true);
// Run Cleaner.
runCleaner(hiveConf);
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"),
0,
TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPLETED_TXN_COMPONENTS"));
verifyDirAndResult(0, true);
}
@Test
public void testImpalaTruncatedMmTableVectorized() throws Exception {
testImpalaTruncatedMmTable(true);
}
@Test
public void testImpalaTruncatedMmTableNonVectorized() throws Exception {
testImpalaTruncatedMmTable(false);
}
/**
* Impala truncates insert-only tables by writing a base directory (like insert overwrite) containing a completely
* empty file. Make sure that Hive reads these bases correctly.
*
* @throws Exception
*/
private void testImpalaTruncatedMmTable(boolean vectorized) throws Exception {
if (!vectorized) {
d.getConf().setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
}
FileSystem fs = FileSystem.get(hiveConf);
FileStatus[] status;
Path tblLocation = new Path(TEST_WAREHOUSE_DIR + "/" +
(TableExtended.MMTBL).toString().toLowerCase());
// 1. Insert two rows to an MM table
runStatementOnDriver("drop table " + TableExtended.MMTBL);
runStatementOnDriver("create table " + TableExtended.MMTBL + "(a int,b int) stored as parquet "
+ "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')");
runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(1,2)");
runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(3,4)");
status = fs.listStatus(tblLocation, FileUtils.STAGING_DIR_PATH_FILTER);
// There should be 2 delta dirs in the location
Assert.assertEquals(2, status.length);
for (int i = 0; i < status.length; i++) {
Assert.assertTrue(status[i].getPath().getName().matches("delta_.*"));
}
// 2. Simulate Impala truncating the table: write a base dir (base_0000003) containing a file with no data. We
// have to delete this file (it's not completely empty, it contains metadata) and create completely empty file
runStatementOnDriver("insert overwrite table " + TableExtended.MMTBL + " select * from "
+ TableExtended.MMTBL + " where 1=2");
status = fs.listStatus(tblLocation, FileUtils.STAGING_DIR_PATH_FILTER);
// There should be 2 delta dirs, plus 1 base dir in the location
Assert.assertEquals(3, status.length);
verifyDir(2, true);
Path basePath = new Path(tblLocation, "base_0000003");
Assert.assertTrue("Deleting file under base failed", fs.delete(new Path(basePath, "000000_0")));
fs.create(new Path(basePath, "empty"));
// 3. Verify query result. Selecting from a truncated table should return nothing.
List<String> rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
Assert.assertEquals(Collections.emptyList(), rs);
// 4. Perform a major compaction. Cleaner should remove the 2 delta dirs.
runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MAJOR'");
runWorker(hiveConf);
runCleaner(hiveConf);
verifyDir(0, true);
rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
Assert.assertEquals(Collections.emptyList(), rs);
if (!vectorized) {
d.getConf().setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
}
}
private void verifyDirAndResult(int expectedDeltas) throws Exception {
verifyDirAndResult(expectedDeltas, false);
}
private void verifyDirAndResult(int expectedDeltas, boolean expectBaseDir) throws Exception {
verifyDir(expectedDeltas, expectBaseDir);
// Verify query result
int [][] resultData = new int[][] {{1,2}, {3,4}};
List<String> rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
Assert.assertEquals(stringifyValues(resultData), rs);
}
private void verifyDir(int expectedDeltas, boolean expectBaseDir) throws Exception {
FileSystem fs = FileSystem.get(hiveConf);
// Verify the content of subdirs
FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(TableExtended.MMTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
int sawDeltaTimes = 0;
int sawBaseTimes = 0;
for (int i = 0; i < status.length; i++) {
if (status[i].getPath().getName().matches("delta_.*")) {
sawDeltaTimes++;
FileStatus[] files = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(1, files.length);
Assert.assertEquals("000000_0", files[0].getPath().getName());
} else {
sawBaseTimes++;
}
}
Assert.assertEquals(expectedDeltas, sawDeltaTimes);
if (expectBaseDir) {
Assert.assertEquals("1 base directory expected", 1, sawBaseTimes);
} else {
Assert.assertEquals("0 base directories expected", 0, sawBaseTimes);
}
}
}