blob: 852cbb1dd23f9ff2c51e34c84a9b4997d99909c9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.List;
/**
* Tests related to support of LOAD DATA with Acid tables
* Most tests run in vectorized and non-vectorized mode since we currently have a vectorized and
* a non-vectorized acid readers and it's critical that ROW_IDs are generated the same way.
*/
public class TestTxnLoadData extends TxnCommandsBaseForTests {
static final private Logger LOG = LoggerFactory.getLogger(TestTxnLoadData.class);
private static final String TEST_DATA_DIR =
new File(System.getProperty("java.io.tmpdir") +
File.separator + TestTxnLoadData.class.getCanonicalName()
+ "-" + System.currentTimeMillis()
).getPath().replaceAll("\\\\", "/");
@Rule
public TemporaryFolder folder= new TemporaryFolder();
@Before
public void setUp() throws Exception {
setUpInternal();
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
}
@Override
protected String getTestDataDir() {
return TEST_DATA_DIR;
}
@Test
public void loadData() throws Exception {
loadData(false);
}
@Test
public void loadDataVectorized() throws Exception {
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
loadData(true);
}
@Test
public void loadDataUpdate() throws Exception {
loadDataUpdate(false);
}
@Test
public void loadDataUpdateVectorized() throws Exception {
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
loadDataUpdate(true);
}
@Test
public void loadDataNonAcid2AcidConversion() throws Exception {
loadDataNonAcid2AcidConversion(false);
}
@Test
public void loadDataNonAcid2AcidConversionVectorized() throws Exception {
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
loadDataNonAcid2AcidConversion(true);
}
@Test
public void testMultiStatement() throws Exception {
testMultiStatement(false);
}
@Test
public void testMultiStatementVectorized() throws Exception {
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
testMultiStatement(true);
}
private void loadDataUpdate(boolean isVectorized) throws Exception {
runStatementOnDriver("drop table if exists T");
runStatementOnDriver("drop table if exists Tstage");
runStatementOnDriver(
"create table T (a int, b int) stored as orc tblproperties('transactional'='true')");
//Tstage is just a simple way to generate test data
runStatementOnDriver("create table Tstage (a int, b int) stored as orc tblproperties('transactional'='false')");
runStatementOnDriver("insert into Tstage values(1,2),(3,4)");
//this creates an ORC data file with correct schema under table root
runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'");
//"load data local inpath" doesn't delete source files so clean it here
runStatementOnDriver("truncate table Tstage");
//and do a Load Data into the same table, which should now land in a delta_x_x.
// 'data' is created by export command/
runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T");
String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
"select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
String[][] expected = new String[][]{
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/000000_0"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"}};
checkResult(expected, testQuery, isVectorized, "load data inpath");
runStatementOnDriver("update T set b = 17 where a = 1");
String[][] expected2 = new String[][]{
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"},
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000002_0000002_0000/bucket_00000"}
};
checkResult(expected2, testQuery, isVectorized, "update");
runStatementOnDriver("insert into T values(2,2)");
String[][] expectedInter2 = new String[][] {
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"},
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000002_0000002_0000/bucket_00000"},
{"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000003_0000003_0000/bucket_00000_0"}
};
checkResult(expectedInter2, testQuery, isVectorized, "insert");
runStatementOnDriver("delete from T where a = 3");
String[][] expectedInter3 = new String[][] {
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000002_0000002_0000/bucket_00000"},
{"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000003_0000003_0000/bucket_00000_0"}
};
checkResult(expectedInter3, testQuery, isVectorized, "delete");
//test minor compaction
runStatementOnDriver("alter table T compact 'minor'");
TestTxnCommands2.runWorker(hiveConf);
String[][] expected3 = new String[][] {
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000001_0000004_v0000032/bucket_00000"},
{"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000001_0000004_v0000032/bucket_00000"}
};
checkResult(expected3, testQuery, isVectorized, "delete compact minor");
runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' overwrite into table T");
String[][] expected4 = new String[][]{
{"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000005/000000_0"},
{"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000005/000000_0"}};
checkResult(expected4, testQuery, isVectorized, "load data inpath overwrite");
//load same data again (additive)
runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T");
String[][] expectedInt1 = new String[][] {
{"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000005/000000_0"},
{"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000005/000000_0"},
{"{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000006_0000006_0000/000000_0"},
{"{\"writeid\":6,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000006_0000006_0000/000000_0"}
};
checkResult(expectedInt1, testQuery, isVectorized, "load data local inpath");
runStatementOnDriver("update T set b = 17 where a = 1");//matches 2 rows
runStatementOnDriver("delete from T where a = 3");//matches 2 rows
runStatementOnDriver("insert into T values(2,2)");
String[][] expected5 = new String[][]{
{"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000007_0000007_0000/bucket_00000"},
{"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/delta_0000007_0000007_0000/bucket_00000"},
{"{\"writeid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000009_0000009_0000/bucket_00000_0"}
};
checkResult(expected5, testQuery, isVectorized, "load data inpath overwrite update");
//test major compaction
runStatementOnDriver("alter table T compact 'major'");
TestTxnCommands2.runWorker(hiveConf);
String[][] expected6 = new String[][]{
{"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/base_0000009_v0000046/bucket_00000"},
{"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/base_0000009_v0000046/bucket_00000"},
{"{\"writeid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000009_v0000046/bucket_00000"}
};
checkResult(expected6, testQuery, isVectorized, "load data inpath compact major");
}
private void loadData(boolean isVectorized) throws Exception {
runStatementOnDriver("drop table if exists T");
runStatementOnDriver("drop table if exists Tstage");
runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true')");
runStatementOnDriver("insert into T values(0,2),(0,4)");
//Tstage is just a simple way to generate test data
runStatementOnDriver("create table Tstage (a int, b int) stored as orc tblproperties('transactional'='false')");
runStatementOnDriver("insert into Tstage values(1,2),(3,4)");
//this creates an ORC data file with correct schema under table root
runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/1'");
//"load data local inpath" doesn't delete source files so clean it here
runStatementOnDriver("truncate table Tstage");
//and do a Load Data into the same table, which should now land in a delta_x_x.
// 'data' is created by export command/
runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T");
String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
"select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
String[][] expected = new String[][] {
//normal insert
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000001_0000001_0000/bucket_00000_0"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000001_0000001_0000/bucket_00000_0"},
//Load Data
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000002_0000002_0000/000000_0"},
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000002_0000002_0000/000000_0"}};
checkResult(expected, testQuery, isVectorized, "load data inpath");
//test minor compaction
runStatementOnDriver("alter table T compact 'minor'");
TestTxnCommands2.runWorker(hiveConf);
String[][] expected1 = new String[][] {
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000001_0000002_v0000024/bucket_00000"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000001_0000002_v0000024/bucket_00000"},
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000002_v0000024/bucket_00000"},
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000002_v0000024/bucket_00000"}
};
checkResult(expected1, testQuery, isVectorized, "load data inpath (minor)");
//test major compaction
runStatementOnDriver("insert into T values(2,2)");
runStatementOnDriver("alter table T compact 'major'");
TestTxnCommands2.runWorker(hiveConf);
String[][] expected2 = new String[][] {
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/base_0000003_v0000028/bucket_00000"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/base_0000003_v0000028/bucket_00000"},
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000003_v0000028/bucket_00000"},
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000003_v0000028/bucket_00000"},
{"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000003_v0000028/bucket_00000"}
};
checkResult(expected2, testQuery, isVectorized, "load data inpath (major)");
//create more staging data and test Load Data Overwrite
runStatementOnDriver("insert into Tstage values(5,6),(7,8)");
runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/2'");
runStatementOnDriver("load data inpath '" + getWarehouseDir() + "/2/data' overwrite into table T");
String[][] expected3 = new String[][] {
{"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000004/000000_0"},
{"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000004/000000_0"}};
checkResult(expected3, testQuery, isVectorized, "load data inpath overwrite");
//one more major compaction
runStatementOnDriver("insert into T values(6,6)");
runStatementOnDriver("alter table T compact 'major'");
TestTxnCommands2.runWorker(hiveConf);
String[][] expected4 = new String[][] {
{"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000005_v0000037/bucket_00000"},
{"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000005_v0000037/bucket_00000"},
{"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t6\t6", "t/base_0000005_v0000037/bucket_00000"}};
checkResult(expected4, testQuery, isVectorized, "load data inpath overwrite (major)");
}
/**
* Load Data [overwrite] in to an (un-)partitioned acid converted table
*/
private void loadDataNonAcid2AcidConversion(boolean isVectorized) throws Exception {
runStatementOnDriver("drop table if exists T");
runStatementOnDriver("drop table if exists Tstage");
runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='false')");
//per acid write to test nonAcid2acid conversion mixed with load data
runStatementOnDriver("insert into T values(0,2),(0,4)");
runStatementOnDriver("create table Tstage (a int, b int) stored as orc tblproperties('transactional'='false')");
runStatementOnDriver("insert into Tstage values(1,2),(3,4)");
//make 2 more inserts so that we have 000000_0_copy_1, 000000_0_copy_2 files in export
//export works at file level so if you have copy_N in the table dir, you'll have those in output
runStatementOnDriver("insert into Tstage values(2,2),(3,3)");
runStatementOnDriver("insert into Tstage values(4,4),(5,5)");
//create a file we'll import later
runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/1'");
runStatementOnDriver("truncate table Tstage");//clean the staging table
//now convert T to acid
runStatementOnDriver("alter table T SET TBLPROPERTIES ('transactional' = 'true')");
//and do a Load Data into the same table, which should now land in a delta/
// (with 000000_0, 000000_0_copy_1, 000000_0_copy_2)
runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T");
String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
"select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
/*
{"writeid":0,"bucketid":536870912,"rowid":0} 0 2/000000_0
{"writeid":0,"bucketid":536870912,"rowid":1} 0 4/000000_0
{"writeid":1,"bucketid":536870912,"rowid":0} 4 4/delta_0000001_0000001_0000/000000_0
{"writeid":1,"bucketid":536870912,"rowid":1} 5 5/delta_0000001_0000001_0000/000000_0
*/
String[][] expected = new String[][] {
//from pre-acid insert
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/000000_0"},
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/000000_0"},
//from Load Data into acid converted table
{"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":0}\t1\t2",
"t/delta_10000001_10000001_0000/000000_0"},
{"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":1}\t3\t4",
"t/delta_10000001_10000001_0000/000000_0"},
{"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t2\t2",
"t/delta_10000001_10000001_0000/000001_0"},
{"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":1}\t3\t3",
"t/delta_10000001_10000001_0000/000001_0"},
{"{\"writeid\":10000001,\"bucketid\":537001984,\"rowid\":0}\t4\t4",
"t/delta_10000001_10000001_0000/000002_0"},
{"{\"writeid\":10000001,\"bucketid\":537001984,\"rowid\":1}\t5\t5",
"t/delta_10000001_10000001_0000/000002_0"},
};
checkResult(expected, testQuery, isVectorized, "load data inpath");
//create more staging data with copy_N files and do LD+Overwrite
runStatementOnDriver("insert into Tstage values(5,6),(7,8)");
runStatementOnDriver("insert into Tstage values(8,8)");
runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/2'");
runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/2/data' overwrite into table T");
String[][] expected2 = new String[][] {
{"{\"writeid\":10000002,\"bucketid\":536870912,\"rowid\":0}\t5\t6",
"t/base_10000002/000000_0"},
{"{\"writeid\":10000002,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_10000002/000000_0"},
{"{\"writeid\":10000002,\"bucketid\":536936448,\"rowid\":0}\t8\t8",
"t/base_10000002/000001_0"}
};
checkResult(expected2, testQuery, isVectorized, "load data inpath overwrite");
//create 1 more delta_x_x so that compactor has > dir file to compact
runStatementOnDriver("insert into T values(9,9)");
runStatementOnDriver("alter table T compact 'major'");
TestTxnCommands2.runWorker(hiveConf);
String[][] expected3 = new String[][] {
{"{\"writeid\":10000002,\"bucketid\":536870912,\"rowid\":0}\t5\t6",
"t/base_10000003_v0000034/bucket_00000"},
{"{\"writeid\":10000002,\"bucketid\":536870912,\"rowid\":1}\t7\t8",
"t/base_10000003_v0000034/bucket_00000"},
{"{\"writeid\":10000002,\"bucketid\":536936448,\"rowid\":0}\t8\t8",
"t/base_10000003_v0000034/bucket_00001"},
{"{\"writeid\":10000003,\"bucketid\":536870912,\"rowid\":0}\t9\t9",
"t/base_10000003_v0000034/bucket_00000"}
};
checkResult(expected3, testQuery, isVectorized, "load data inpath overwrite (major)");
}
/**
* Load Data [overwrite] in to a partitioned transactional table
*/
@Test
public void loadDataPartitioned() throws Exception {
runStatementOnDriver("drop table if exists T");
runStatementOnDriver("drop table if exists Tstage");
runStatementOnDriver("create table T (a int, b int) partitioned by (p int) stored as orc tblproperties('transactional'='true')");
runStatementOnDriver("create table Tstage (a int, b int) stored as orc tblproperties('transactional'='false')");
runStatementOnDriver("insert into Tstage values(0,2),(0,4)");
runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/1'");
runStatementOnDriver("truncate table Tstage");//because 'local' inpath doesn't delete source files
runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T partition(p=0)");
runStatementOnDriver("insert into Tstage values(1,2),(1,4)");
runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/2'");
runStatementOnDriver("truncate table Tstage");
runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/2/data' into table T partition(p=1)");
runStatementOnDriver("insert into Tstage values(2,2),(2,4)");
runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/3'");
runStatementOnDriver("truncate table Tstage");
runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/3/data' into table T partition(p=1)");
List<String> rs = runStatementOnDriver("select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID");
String[][] expected = new String[][] {
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000001_0000001_0000/000000_0"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000001_0000001_0000/000000_0"},
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t2", "t/p=1/delta_0000002_0000002_0000/000000_0"},
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4", "t/p=1/delta_0000002_0000002_0000/000000_0"},
{"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t2", "t/p=1/delta_0000003_0000003_0000/000000_0"},
{"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4", "t/p=1/delta_0000003_0000003_0000/000000_0"}};
checkExpected(rs, expected, "load data inpath partitioned");
runStatementOnDriver("insert into Tstage values(5,2),(5,4)");
runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/4'");
runStatementOnDriver("truncate table Tstage");
runStatementOnDriver("load data inpath '" + getWarehouseDir() + "/4/data' overwrite into table T partition(p=1)");
String[][] expected2 = new String[][] {
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000001_0000001_0000/000000_0"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000001_0000001_0000/000000_0"},
{"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t1\t5\t2", "t/p=1/base_0000004/000000_0"},
{"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t1\t5\t4", "t/p=1/base_0000004/000000_0"}};
rs = runStatementOnDriver("select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID");
checkExpected(rs, expected2, "load data inpath partitioned overwrite");
}
/**
* By default you can't load into bucketed tables. Things will break badly in acid (data loss, etc)
* if loaded data is not bucketed properly. This test is to capture that this is still the default.
* If the default is changed, Load Data should probably do more validation to ensure data is
* properly distributed into files and files are named correctly.
* With the availability of new feature to rewrite such "load data" commands into insert-as-select,
* the test should let the load data pass.
*/
@Test
public void testValidations() throws Exception {
runStatementOnDriver("drop table if exists T");
runStatementOnDriver("drop table if exists Tstage");
runStatementOnDriver("create table T (a int, b int) clustered by (a) into 2 buckets stored as orc tblproperties('transactional'='true')");
File createdFile= folder.newFile("myfile.txt");
FileUtils.writeStringToFile(createdFile, "hello world");
runStatementOnDriver("create table Tstage (a int, b int) stored as orc tblproperties('transactional'='false')");
//this creates an ORC data file with correct schema under table root
runStatementOnDriver("insert into Tstage values(1,2),(3,4)");
// This will work with the new support of rewriting load into IAS.
runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/Tstage' into table T");
}
private void checkExpected(List<String> rs, String[][] expected, String msg) {
super.checkExpected(rs, expected, msg, LOG, true);
}
@Test
public void testMMOrcTable() throws Exception {
runStatementOnDriver("drop table if exists T");
runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true', 'transactional_properties'='insert_only')");
int[][] values = {{1,2},{3,4}};
runStatementOnDriver("insert into T " + makeValuesClause(values));
List<String> rs = runStatementOnDriver("select a, b from T order by b");
Assert.assertEquals("", stringifyValues(values), rs);
}
/**
* Make sure Load Data assigns ROW_IDs correctly when there is statementId suffix on delta dir
* For example, delta_x_x_0001.
*/
private void testMultiStatement(boolean isVectorized) throws Exception {
runStatementOnDriver("drop table if exists T");
runStatementOnDriver("drop table if exists Tstage");
runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true')");
//Tstage is just a simple way to generate test data
runStatementOnDriver("create table Tstage (a int, b int) stored as orc tblproperties('transactional'='false')");
runStatementOnDriver("insert into Tstage values(5,5),(6,6)");
//this creates an ORC data file with correct schema under table root
runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'");
//and do a Load Data into the same table, which should now land in a delta_x_x.
// 'data' is created by export command/
runStatementOnDriver("START TRANSACTION");
//statementId = 0
runStatementOnDriver("insert into T values(1,2),(3,4)");
//statementId = 1
runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T");
runStatementOnDriver("COMMIT");
String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
"select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
String[][] expected = new String[][] {
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000_0"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000_0"},
{"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/delta_0000001_0000001_0001/000000_0"},
{"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/delta_0000001_0000001_0001/000000_0"}
};
checkResult(expected, testQuery, isVectorized, "load data inpath");
runStatementOnDriver("alter table T compact 'major'");
TestTxnCommands2.runWorker(hiveConf);
String[][] expected2 = new String[][] {
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000001_v0000022/bucket_00000"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000001_v0000022/bucket_00000"},
{"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/base_0000001_v0000022/bucket_00000"},
{"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/base_0000001_v0000022/bucket_00000"}
};
checkResult(expected2, testQuery, isVectorized, "load data inpath (major)");
//at lest for now, Load Data w/Overwrite is not allowed in a txn: HIVE-18154
}
@Test
public void testAbort() throws Exception {
boolean isVectorized = false;
runStatementOnDriver("drop table if exists T");
runStatementOnDriver("drop table if exists Tstage");
runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true')");
//Tstage is just a simple way to generate test data
runStatementOnDriver("create table Tstage (a int, b int) stored as orc tblproperties('transactional'='false')");
runStatementOnDriver("insert into Tstage values(5,5),(6,6)");
//this creates an ORC data file with correct schema under table root
runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'");
//and do a Load Data into the same table, which should now land in a delta_x_x.
// 'data' is created by export command/
runStatementOnDriver("insert into T values(1,2),(3,4)");
runStatementOnDriver("START TRANSACTION");
runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T");
runStatementOnDriver("ROLLBACK");
String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
"select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
String[][] expected = new String[][] {
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000_0"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000_0"}
};
checkResult(expected, testQuery, isVectorized, "load data inpath");
}
void checkResult(String[][] expectedResult, String query, boolean isVectorized,
String msg) throws Exception{
checkResult(expectedResult, query, isVectorized, msg, LOG);
}
@Test
public void testLoadAcidFile() throws Exception {
MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true);
runStatementOnDriver("drop table if exists T");
runStatementOnDriver("drop table if exists T2");
runStatementOnDriver(
"create table T (a int, b int) stored as orc");
//This is just a simple way to generate test data
runStatementOnDriver("create table T2(a int, b int) stored as orc");
runStatementOnDriver("insert into T values(1,2)");
List<String> rs = runStatementOnDriver("select INPUT__FILE__NAME from T");
Assert.assertEquals(1, rs.size());
Assert.assertTrue("Unexpcted file name", rs.get(0)
.endsWith("t/delta_0000001_0000001_0000/bucket_00000_0"));
//T2 is an acid table so this should fail
CommandProcessorException e =
runStatementOnDriverNegative("load data local inpath '" + rs.get(0) + "' into table T2");
Assert.assertEquals("Unexpected error code",
ErrorMsg.LOAD_DATA_ACID_FILE.getErrorCode(), e.getErrorCode());
}
}