| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hive.ql; |
| |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hive.conf.Constants; |
| import org.apache.hadoop.hive.conf.HiveConf; |
| import org.apache.hadoop.hive.metastore.IMetaStoreClient; |
| import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; |
| import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; |
| import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; |
| import org.apache.hadoop.hive.metastore.conf.MetastoreConf; |
| import org.apache.hadoop.hive.metastore.txn.TxnStore; |
| import org.apache.hadoop.hive.metastore.txn.TxnUtils; |
| import org.apache.hadoop.hive.ql.io.BucketCodec; |
| import org.apache.hadoop.hive.ql.metadata.Hive; |
| import org.apache.hadoop.hive.ql.processors.CommandProcessorException; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.TestName; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| public class TestTxnNoBuckets extends TxnCommandsBaseForTests { |
| static final private Logger LOG = LoggerFactory.getLogger(TestTxnNoBuckets.class); |
| private static final String NO_BUCKETS_TBL_NAME = "nobuckets"; |
| private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + |
| File.separator + TestTxnNoBuckets.class.getCanonicalName() |
| + "-" + System.currentTimeMillis() |
| ).getPath().replaceAll("\\\\", "/"); |
| @Rule |
| public TestName testName = new TestName(); |
| @Override |
| protected String getTestDataDir() { |
| return TEST_DATA_DIR; |
| } |
| |
| @Before |
| public void setUp() throws Exception { |
| setUpInternal(); |
| //see TestTxnNoBucketsVectorized for vectorized version |
| hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); |
| } |
| |
| private boolean shouldVectorize() { |
| return hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED); |
| } |
| /** |
| * Tests that Acid can work with un-bucketed tables. |
| */ |
| @Test |
| public void testNoBuckets() throws Exception { |
| int[][] sourceVals1 = {{0,0,0},{3,3,3}}; |
| int[][] sourceVals2 = {{1,1,1},{2,2,2}}; |
| runStatementOnDriver("drop table if exists tmp"); |
| runStatementOnDriver("create table tmp (c1 integer, c2 integer, c3 integer) stored as orc tblproperties('transactional'='false')"); |
| runStatementOnDriver("insert into tmp " + makeValuesClause(sourceVals1)); |
| runStatementOnDriver("insert into tmp " + makeValuesClause(sourceVals2)); |
| runStatementOnDriver(String.format("drop table if exists %s", NO_BUCKETS_TBL_NAME)); |
| runStatementOnDriver("create table " + NO_BUCKETS_TBL_NAME + " (c1 integer, c2 integer, c3 integer) stored " + |
| "as orc tblproperties('transactional'='true', 'transactional_properties'='default')"); |
| String stmt = String.format("insert into %s select * from tmp", NO_BUCKETS_TBL_NAME); |
| runStatementOnDriver(stmt); |
| List<String> rs = runStatementOnDriver( |
| String.format("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from %s order by ROW__ID", NO_BUCKETS_TBL_NAME)); |
| Assert.assertEquals("", 4, rs.size()); |
| LOG.warn("after insert"); |
| for(String s : rs) { |
| LOG.warn(s); |
| } |
| /**the insert creates 2 output files (presumably because there are 2 input files) |
| * The number in the file name is writerId. This is the number encoded in ROW__ID.bucketId - |
| * see {@link org.apache.hadoop.hive.ql.io.BucketCodec}*/ |
| Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t0\t")); |
| Assert.assertTrue(rs.get(0), rs.get(0).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00000_0")); |
| Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3\t")); |
| Assert.assertTrue(rs.get(1), rs.get(1).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00000_0")); |
| Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t1\t")); |
| Assert.assertTrue(rs.get(2), rs.get(2).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00001_0")); |
| Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t")); |
| Assert.assertTrue(rs.get(3), rs.get(3).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00001_0")); |
| |
| hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false); |
| rs = runStatementOnDriver(String.format("explain update %s set c3 = 17 where c3 in(0,1)", NO_BUCKETS_TBL_NAME)); |
| LOG.warn("Query Plan: "); |
| for (String s : rs) { |
| LOG.warn(s); |
| } |
| |
| runStatementOnDriver(String.format("update %s set c3 = 17 where c3 in(0,1)", NO_BUCKETS_TBL_NAME)); |
| rs = runStatementOnDriver( |
| String.format("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from %s order by INPUT__FILE__NAME, ROW__ID", |
| NO_BUCKETS_TBL_NAME)); |
| LOG.warn("after update"); |
| for(String s : rs) { |
| LOG.warn(s); |
| } |
| Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3\t")); |
| Assert.assertTrue(rs.get(0), rs.get(0).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00000_0")); |
| Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t")); |
| Assert.assertTrue(rs.get(1), rs.get(1).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00001_0")); |
| //so update has 1 writer, but which creates buckets where the new rows land |
| Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17\t")); |
| Assert.assertTrue(rs.get(2), rs.get(2).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000002_0000002_0000/bucket_00000")); |
| // update for "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t1\t" |
| Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t17\t")); |
| Assert.assertTrue(rs.get(3), rs.get(3).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000002_0000002_0000/bucket_00001")); |
| |
| Set<String> expectedFiles = new HashSet<>(); |
| //both delete events land in corresponding buckets to the original row-ids |
| expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delete_delta_0000002_0000002_0000/bucket_00000"); |
| expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delete_delta_0000002_0000002_0000/bucket_00001"); |
| expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00000_0"); |
| expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00001_0"); |
| expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000002_0000002_0000/bucket_00000"); |
| expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000002_0000002_0000/bucket_00001"); |
| //check that we get the right files on disk |
| assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/" + NO_BUCKETS_TBL_NAME, NO_BUCKETS_TBL_NAME); |
| //todo: it would be nice to check the contents of the files... could use orc.FileDump - it has |
| // methods to print to a supplied stream but those are package private |
| |
| runStatementOnDriver(String.format("alter table %s compact 'major'", NO_BUCKETS_TBL_NAME)); |
| TestTxnCommands2.runWorker(hiveConf); |
| rs = runStatementOnDriver( |
| String.format("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from %s order by INPUT__FILE__NAME, ROW__ID", |
| NO_BUCKETS_TBL_NAME)); |
| LOG.warn("after major compact"); |
| for(String s : rs) { |
| LOG.warn(s); |
| } |
| /* |
| ├── base_0000002_v0000025 |
| │  ├── bucket_00000 |
| │  └── bucket_00001 |
| ├── delete_delta_0000002_0000002_0000 |
| │  └── bucket_00000 |
| | └── bucket_00001 |
| ├── delta_0000001_0000001_0000 |
| │  ├── bucket_00000 |
| │  └── bucket_00001 |
| └── delta_0000002_0000002_0000 |
| └── bucket_00000 |
| */ |
| |
| String expected[][] = { |
| {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00000"}, |
| {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t17", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00001"}, |
| {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00001"}, |
| {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00000"} |
| }; |
| checkResult(expected, |
| "select ROW__ID, c1, c2, c3" + (shouldVectorize() ? "" : ", INPUT__FILE__NAME") |
| + " from " + NO_BUCKETS_TBL_NAME + " order by c1, c2, c3", |
| shouldVectorize(), |
| "After Major Compaction", LOG); |
| |
| expectedFiles.clear(); |
| expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delete_delta_0000002_0000002_0000/bucket_00000"); |
| expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delete_delta_0000002_0000002_0000/bucket_00001"); |
| expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00000_0"); |
| expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00001_0"); |
| expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000002_0000002_0000/bucket_00000"); |
| expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000002_0000002_0000/bucket_00001"); |
| expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00000"); |
| expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00001"); |
| assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/" + NO_BUCKETS_TBL_NAME, NO_BUCKETS_TBL_NAME); |
| |
| TestTxnCommands2.runCleaner(hiveConf); |
| rs = runStatementOnDriver(String.format("select c1, c2, c3 from %s order by c1, c2, c3", NO_BUCKETS_TBL_NAME)); |
| int[][] result = {{0,0,17},{1,1,17},{2,2,2},{3,3,3}}; |
| Assert.assertEquals("Unexpected result after clean", stringifyValues(result), rs); |
| |
| expectedFiles.clear(); |
| expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00000"); |
| expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00001"); |
| assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/" + NO_BUCKETS_TBL_NAME, NO_BUCKETS_TBL_NAME); |
| } |
| |
| @Test |
| public void testNoBucketsDP() throws Exception { |
| int[][] sourceVals1 = {{0,0,0},{3,3,3}}; |
| int[][] sourceVals2 = {{1,1,1},{2,2,2}}; |
| int[][] sourceVals3 = {{3,3,3},{4,4,4}}; |
| int[][] sourceVals4 = {{5,5,5},{6,6,6}}; |
| runStatementOnDriver("drop table if exists tmp"); |
| runStatementOnDriver("create table tmp (c1 integer, c2 integer) partitioned by (c3 integer) stored as orc " + |
| "tblproperties('transactional'='false')"); |
| runStatementOnDriver("insert into tmp " + makeValuesClause(sourceVals1)); |
| runStatementOnDriver("insert into tmp " + makeValuesClause(sourceVals2)); |
| runStatementOnDriver("insert into tmp " + makeValuesClause(sourceVals3)); |
| runStatementOnDriver("insert into tmp " + makeValuesClause(sourceVals4)); |
| runStatementOnDriver("drop table if exists " + NO_BUCKETS_TBL_NAME); |
| runStatementOnDriver(String.format("create table %s (c1 integer, c2 integer) partitioned by (c3 integer) stored " + |
| "as orc tblproperties('transactional'='true', 'transactional_properties'='default')", NO_BUCKETS_TBL_NAME)); |
| String stmt = String.format("insert into %s partition(c3) select * from tmp", NO_BUCKETS_TBL_NAME); |
| runStatementOnDriver(stmt); |
| List<String> rs = runStatementOnDriver( |
| String.format("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from %s order by ROW__ID", NO_BUCKETS_TBL_NAME)); |
| Assert.assertEquals("", 8, rs.size()); |
| LOG.warn("after insert"); |
| for(String s : rs) { |
| LOG.warn(s); |
| } |
| |
| rs = runStatementOnDriver( |
| String.format("select * from %s where c2 in (0,3)", NO_BUCKETS_TBL_NAME)); |
| Assert.assertEquals(3, rs.size()); |
| runStatementOnDriver(String.format("update %s set c2 = 17 where c2 in(0,3)", NO_BUCKETS_TBL_NAME)); |
| rs = runStatementOnDriver( |
| String.format("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from %s order by INPUT__FILE__NAME, ROW__ID", |
| NO_BUCKETS_TBL_NAME)); |
| LOG.warn("after update"); |
| for(String s : rs) { |
| LOG.warn(s); |
| } |
| rs = runStatementOnDriver(String.format("select * from %s where c2=17", NO_BUCKETS_TBL_NAME)); |
| Assert.assertEquals(3, rs.size()); |
| } |
| |
| /** |
| * See CTAS tests in TestAcidOnTez |
| */ |
| @Test |
| public void testCTAS() throws Exception { |
| runStatementOnDriver("drop table if exists myctas"); |
| int[][] values = {{1,2},{3,4}}; |
| runStatementOnDriver("insert into " + Table.NONACIDORCTBL + makeValuesClause(values)); |
| runStatementOnDriver("create table myctas stored as ORC TBLPROPERTIES ('transactional" + |
| "'='true', 'transactional_properties'='default') as select a, b from " + Table.NONACIDORCTBL); |
| List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from myctas order by ROW__ID"); |
| String expected[][] = { |
| {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t3\t4", "warehouse/myctas/delta_0000001_0000001_0000/bucket_00000"}, |
| {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "warehouse/myctas/delta_0000001_0000001_0000/bucket_00001"}, |
| }; |
| checkExpected(rs, expected, "Unexpected row count after ctas from non acid table"); |
| |
| runStatementOnDriver("insert into " + Table.ACIDTBL + makeValuesClause(values)); |
| //todo: try this with acid default - it seem making table acid in listener is too late |
| runStatementOnDriver("create table myctas2 stored as ORC TBLPROPERTIES ('transactional" + |
| "'='true', 'transactional_properties'='default') as select a, b from " + Table.ACIDTBL); |
| rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from myctas2 order by ROW__ID"); |
| String expected2[][] = { |
| {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "warehouse/myctas2/delta_0000001_0000001_0000/bucket_00000"}, |
| {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t3\t4", "warehouse/myctas2/delta_0000001_0000001_0000/bucket_00001"} |
| }; |
| checkExpected(rs, expected2, "Unexpected row count after ctas from acid table"); |
| |
| runStatementOnDriver("create table myctas3 stored as ORC TBLPROPERTIES ('transactional" + |
| "'='true', 'transactional_properties'='default') as select a, b from " + Table.NONACIDORCTBL + |
| " union all select a, b from " + Table.ACIDTBL); |
| rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from myctas3 order by ROW__ID"); |
| String expected3[][] = { |
| {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00000"}, |
| {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t3\t4", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00001"}, |
| {"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t3\t4", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00002"}, |
| {"{\"writeid\":1,\"bucketid\":537067520,\"rowid\":0}\t1\t2", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00003"}, |
| }; |
| checkExpected(rs, expected3, "Unexpected row count after ctas from union all query"); |
| |
| runStatementOnDriver("create table myctas4 stored as ORC TBLPROPERTIES ('transactional" + |
| "'='true', 'transactional_properties'='default') as select a, b from " + Table.NONACIDORCTBL + |
| " union distinct select a, b from " + Table.ACIDTBL); |
| rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from myctas4 order by ROW__ID"); |
| String expected4[][] = { |
| {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/delta_0000001_0000001_0000/bucket_00000"}, |
| {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/delta_0000001_0000001_0000/bucket_00000"}, |
| }; |
| checkExpected(rs, expected4, "Unexpected row count after ctas from union distinct query"); |
| } |
| @Test |
| public void testCtasEmpty() throws Exception { |
| MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); |
| runStatementOnDriver("drop table if exists myctas"); |
| runStatementOnDriver("create table myctas stored as ORC as" + |
| " select a, b from " + Table.NONACIDORCTBL); |
| List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME" + |
| " from myctas order by ROW__ID"); |
| } |
| |
| /** |
| * Insert into unbucketed acid table from union all query |
| * Union All is flattened so nested subdirs are created and acid move drops them since |
| * delta dirs have unique names |
| */ |
| @Test |
| public void testInsertToAcidWithUnionRemove() throws Exception { |
| hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_OPTIMIZE_UNION_REMOVE, true); |
| hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); |
| d.close(); |
| d = new Driver(hiveConf); |
| int[][] values = {{1,2},{3,4},{5,6},{7,8},{9,10}}; |
| runStatementOnDriver("insert into " + TxnCommandsBaseForTests.Table.ACIDTBL + makeValuesClause(values));//HIVE-17138: this creates 1 delta_0000013_0000013_0000/bucket_00001 |
| runStatementOnDriver("drop table if exists T"); |
| runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='true')"); |
| /* |
| So Union All removal kicks in and we get 3 subdirs in staging. |
| ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505516390532/warehouse/t/.hive-staging_hive_2017-09-15_16-05-06_895_1123322677843388168-1/ |
| └── -ext-10000 |
| ├── HIVE_UNION_SUBDIR_19 |
| │  └── 000000_0 |
| │  ├── _orc_acid_version |
| │  └── delta_0000016_0000016_0001 |
| ├── HIVE_UNION_SUBDIR_20 |
| │  └── 000000_0 |
| │  ├── _orc_acid_version |
| │  └── delta_0000016_0000016_0002 |
| └── HIVE_UNION_SUBDIR_21 |
| └── 000000_0 |
| ├── _orc_acid_version |
| └── delta_0000016_0000016_0003*/ |
| runStatementOnDriver("insert into T(a,b) select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a between 1 and 3 group by a, b union all select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a >= 9"); |
| |
| List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"); |
| |
| String expected[][] = { |
| {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000001_0000001_0001/bucket_00000_0"}, |
| {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000001_0000001_0001/bucket_00000_0"}, |
| {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":0}\t5\t6", "/delta_0000001_0000001_0002/bucket_00000_0"}, |
| {"{\"writeid\":1,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "/delta_0000001_0000001_0003/bucket_00000_0"}, |
| {"{\"writeid\":1,\"bucketid\":536936450,\"rowid\":0}\t7\t8", "/delta_0000001_0000001_0002/bucket_00001_0"}, |
| }; |
| checkExpected(rs, expected, "Unexpected row count after ctas"); |
| } |
| @Test |
| public void testInsertOverwriteToAcidWithUnionRemove() throws Exception { |
| hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_OPTIMIZE_UNION_REMOVE, true); |
| hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); |
| d.close(); |
| d = new Driver(hiveConf); |
| int[][] values = {{1, 2}, {3, 4}, {5, 6}, {7, 8}, {9, 10}}; |
| runStatementOnDriver("drop table if exists T"); |
| runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='true')"); |
| |
| CommandProcessorException e = runStatementOnDriverNegative( |
| "insert overwrite table T select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + |
| " where a between 1 and 3 group by a, b union all select a, b from " + |
| TxnCommandsBaseForTests.Table.ACIDTBL + |
| " where a between 5 and 7 union all select a, b from " + |
| TxnCommandsBaseForTests.Table.ACIDTBL + " where a >= 9"); |
| Assert.assertTrue("", e.getMessage().contains("not supported due to OVERWRITE and UNION ALL")); |
| } |
| /** |
| * The idea here is to create a non acid table that was written by multiple writers, i.e. |
| * unbucketed table that has 000000_0 & 000001_0, for example. |
| * Also, checks that we can handle a case when data files can be at multiple levels (subdirs) |
| * of the table. |
| */ |
| @Test |
| public void testToAcidConversionMultiBucket() throws Exception { |
| //need to disable these so that automatic merge doesn't merge the files |
| hiveConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPFILES, false); |
| hiveConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false); |
| hiveConf.setBoolVar(HiveConf.ConfVars.HIVEMERGETEZFILES, false); |
| d.close(); |
| d = new Driver(hiveConf); |
| |
| int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}}; |
| runStatementOnDriver("insert into " + Table.ACIDTBL + makeValuesClause(values)); |
| runStatementOnDriver("drop table if exists T"); |
| runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='false')"); |
| runStatementOnDriver("insert into T(a,b) select a, b from " + Table.ACIDTBL + |
| " where a between 1 and 3 group by a, b union all select a, b from " + Table.ACIDTBL + |
| " where a between 5 and 7 union all select a, b from " + Table.ACIDTBL + " where a >= 9"); |
| runStatementOnDriver("insert into T values(12,12)"); |
| |
| List<String> rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); |
| //previous insert+union creates 3 data files (0-3) |
| //insert (12,12) creates 000000_0_copy_1 |
| String expected[][] = { |
| {"1\t2", "warehouse/t/000002_0"}, |
| {"2\t4", "warehouse/t/000002_0"}, |
| {"5\t6", "warehouse/t/000000_0"}, |
| {"6\t8", "warehouse/t/000001_0"}, |
| {"9\t10", "warehouse/t/000000_0"}, |
| {"12\t12", "warehouse/t/000000_0_copy_1"} |
| }; |
| checkExpected(rs, expected,"before converting to acid"); |
| |
| //now do Insert from Union here to create data files in sub dirs |
| hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_OPTIMIZE_UNION_REMOVE, true); |
| hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); |
| d.close(); |
| d = new Driver(hiveConf); |
| runStatementOnDriver("insert into T(a,b) select a * 10, b * 10 from " + Table.ACIDTBL + |
| " where a between 1 and 3 group by a, b union all select a * 10, b * 10 from " + Table.ACIDTBL + |
| " where a between 5 and 7"); |
| //now we have a table with data files at multiple different levels. |
| String expected1[][] = { |
| {"1\t2", "warehouse/t/000002_0"}, |
| {"2\t4", "warehouse/t/000002_0"}, |
| {"5\t6", "warehouse/t/000000_0"}, |
| {"6\t8", "warehouse/t/000001_0"}, |
| {"9\t10", "warehouse/t/000000_0"}, |
| {"10\t20", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"}, |
| {"12\t12", "warehouse/t/000000_0_copy_1"}, |
| {"20\t40", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"}, |
| {"50\t60", "warehouse/t/HIVE_UNION_SUBDIR_16/000000_0"}, |
| {"60\t80", "warehouse/t/HIVE_UNION_SUBDIR_16/000001_0"} |
| }; |
| rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); |
| checkExpected(rs, expected1,"before converting to acid (with multi level data layout)"); |
| |
| //make it an Acid table and make sure we assign ROW__IDs correctly |
| runStatementOnDriver("alter table T SET TBLPROPERTIES ('transactional'='true')"); |
| rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); |
| /** |
| now that T is Acid, data for each writerId is treated like a logical bucket (though T is not |
| bucketed), so rowid are assigned per logical bucket (e.g. 000000_0 + 000000_0_copy_1 + subdirs). |
| {@link AcidUtils.Directory#getOriginalFiles()} ensures consistent ordering of files within |
| logical bucket (tranche) |
| */ |
| String expected2[][] = { |
| {"{\"writeid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2", "warehouse/t/000002_0"}, |
| {"{\"writeid\":0,\"bucketid\":537001984,\"rowid\":1}\t2\t4", "warehouse/t/000002_0"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "warehouse/t/000000_0"}, |
| {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t6\t8", "warehouse/t/000001_0"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t9\t10", "warehouse/t/000000_0"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t10\t20", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t12\t12", "warehouse/t/000000_0_copy_1"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t20\t40", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":5}\t50\t60", "warehouse/t/HIVE_UNION_SUBDIR_16/000000_0"}, |
| {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t60\t80", "warehouse/t/HIVE_UNION_SUBDIR_16/000001_0"}, |
| }; |
| checkExpected(rs, expected2,"after converting to acid (no compaction)"); |
| Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912)); |
| Assert.assertEquals(2, BucketCodec.determineVersion(537001984).decodeWriterId(537001984)); |
| Assert.assertEquals(1, BucketCodec.determineVersion(536936448).decodeWriterId(536936448)); |
| |
| assertVectorized(shouldVectorize(), "update T set b = 88 where b = 80"); |
| runStatementOnDriver("update T set b = 88 where b = 80"); |
| assertVectorized(shouldVectorize(), "delete from T where b = 8"); |
| runStatementOnDriver("delete from T where b = 8"); |
| String expected3[][] = { |
| {"{\"writeid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2", "warehouse/t/000002_0"}, |
| {"{\"writeid\":0,\"bucketid\":537001984,\"rowid\":1}\t2\t4", "warehouse/t/000002_0"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "warehouse/t/000000_0"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t9\t10", "warehouse/t/000000_0"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t10\t20", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t12\t12", "warehouse/t/000000_0_copy_1"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t20\t40", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":5}\t50\t60", "warehouse/t/HIVE_UNION_SUBDIR_16/000000_0"}, |
| // update for "{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t60\t80" |
| {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t60\t88", "warehouse/t/delta_10000001_10000001_0000/bucket_00001"}, |
| }; |
| rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); |
| checkExpected(rs, expected3,"after converting to acid (no compaction with updates)"); |
| |
| //major compaction + check data + files |
| runStatementOnDriver("alter table T compact 'major'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); |
| |
| /*Compaction preserves location of rows wrt buckets/tranches (for now)*/ |
| String expected4[][] = { |
| {"{\"writeid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2", |
| "warehouse/t/base_10000002_v0000028/bucket_00002"}, |
| {"{\"writeid\":0,\"bucketid\":537001984,\"rowid\":1}\t2\t4", |
| "warehouse/t/base_10000002_v0000028/bucket_00002"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t5\t6", |
| "warehouse/t/base_10000002_v0000028/bucket_00000"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t9\t10", |
| "warehouse/t/base_10000002_v0000028/bucket_00000"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t10\t20", |
| "warehouse/t/base_10000002_v0000028/bucket_00000"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t12\t12", |
| "warehouse/t/base_10000002_v0000028/bucket_00000"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t20\t40", |
| "warehouse/t/base_10000002_v0000028/bucket_00000"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":5}\t50\t60", |
| "warehouse/t/base_10000002_v0000028/bucket_00000"}, |
| {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t60\t88", |
| "warehouse/t/base_10000002_v0000028/bucket_00001"}, |
| }; |
| checkExpected(rs, expected4,"after major compact"); |
| } |
| @Test |
| public void testInsertFromUnion() throws Exception { |
| int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}}; |
| runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + makeValuesClause(values)); |
| runStatementOnDriver("drop table if exists T"); |
| runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='true')"); |
| runStatementOnDriver("insert into T(a,b) select a, b from " + Table.NONACIDNONBUCKET + " where a between 1 and 3 group by a, b union all select a, b from " + Table.NONACIDNONBUCKET + " where a between 5 and 7 union all select a, b from " + Table.NONACIDNONBUCKET + " where a >= 9"); |
| List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); |
| LOG.warn("before converting to acid"); |
| for(String s : rs) { |
| LOG.warn(s); |
| } |
| /* |
| The number of writers seems to be based on number of MR jobs for the src query. todo check number of FileSinks |
| warehouse/t/.hive-staging_hive_2017-09-13_08-59-28_141_6304543600372946004-1/-ext-10000/000000_0/delta_0000001_0000001_0000/bucket_00000 [length: 648] |
| {"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":0,"currentTransaction":1,"row":{"_col0":1,"_col1":2}} |
| {"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":1,"currentTransaction":1,"row":{"_col0":2,"_col1":4}} |
| ________________________________________________________________________________________________________________________ |
| warehouse/t/.hive-staging_hive_2017-09-13_08-59-28_141_6304543600372946004-1/-ext-10000/000001_0/delta_0000001_0000001_0000/bucket_00001 [length: 658] |
| {"operation":0,"originalTransaction":1,"bucket":536936448,"rowId":0,"currentTransaction":1,"row":{"_col0":5,"_col1":6}} |
| {"operation":0,"originalTransaction":1,"bucket":536936448,"rowId":1,"currentTransaction":1,"row":{"_col0":6,"_col1":8}} |
| {"operation":0,"originalTransaction":1,"bucket":536936448,"rowId":2,"currentTransaction":1,"row":{"_col0":9,"_col1":10}} |
| */ |
| rs = runStatementOnDriver("select a, b from T order by a, b"); |
| Assert.assertEquals(stringifyValues(values), rs); |
| rs = runStatementOnDriver("select ROW__ID from T group by ROW__ID having count(*) > 1"); |
| if(rs.size() > 0) { |
| Assert.assertEquals("Duplicate ROW__IDs: " + rs.get(0), 0, rs.size()); |
| } |
| } |
| /** |
| * see HIVE-16177 |
| * See also {@link TestTxnCommands2#testNonAcidToAcidConversion02()} |
| */ |
| @Test |
| public void testToAcidConversion02() throws Exception { |
| //create 2 rows in a file 00000_0 |
| runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(1,2),(1,3)"); |
| //create 4 rows in a file 000000_0_copy_1 |
| runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(0,12),(0,13),(1,4),(1,5)"); |
| //create 1 row in a file 000000_0_copy_2 |
| runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(1,6)"); |
| |
| //convert the table to Acid //todo: remove trans_prop after HIVE-17089 |
| runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')"); |
| List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID"); |
| LOG.warn("before acid ops (after convert)"); |
| for(String s : rs) { |
| LOG.warn(s); |
| } |
| //create a some of delta directories |
| runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(0,15),(1,16)"); |
| runStatementOnDriver("update " + Table.NONACIDNONBUCKET + " set b = 120 where a = 0 and b = 12"); |
| runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(0,17)"); |
| runStatementOnDriver("delete from " + Table.NONACIDNONBUCKET + " where a = 1 and b = 3"); |
| |
| rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by a,b"); |
| LOG.warn("before compact"); |
| for(String s : rs) { |
| LOG.warn(s); |
| } |
| Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912)); |
| /* |
| * All ROW__IDs are unique on read after conversion to acid |
| * ROW__IDs are exactly the same before and after compaction |
| * Also check the file name (only) after compaction for completeness |
| */ |
| String[][] expected = { |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t0\t13", |
| "bucket_00000", "000000_0_copy_1"}, |
| {"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":0}\t0\t15", |
| "bucket_00000", "bucket_00000_0"}, |
| {"{\"writeid\":10000003,\"bucketid\":536870912,\"rowid\":0}\t0\t17", |
| "bucket_00000", "bucket_00000_0"}, |
| {"{\"writeid\":10000002,\"bucketid\":536870912,\"rowid\":0}\t0\t120", |
| "bucket_00000", "bucket_00000"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", |
| "bucket_00000", "000000_0"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t1\t4", |
| "bucket_00000", "000000_0_copy_1"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":5}\t1\t5", |
| "bucket_00000", "000000_0_copy_1"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":6}\t1\t6", |
| "bucket_00000", "000000_0_copy_2"}, |
| {"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":1}\t1\t16", |
| "bucket_00000", "bucket_00000_0"} |
| }; |
| Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size()); |
| for(int i = 0; i < expected.length; i++) { |
| Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); |
| Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][2])); |
| } |
| //run Compaction |
| runStatementOnDriver("alter table "+ Table.NONACIDNONBUCKET +" compact 'major'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| /* |
| nonacidnonbucket/ |
| ├── 000000_0 |
| ├── 000000_0_copy_1 |
| ├── 000000_0_copy_2 |
| ├── base_0000004 |
| │  └── bucket_00000 |
| ├── delete_delta_0000002_0000002_0000 |
| │  └── bucket_00000 |
| ├── delete_delta_0000004_0000004_0000 |
| │  └── bucket_00000 |
| ├── delta_0000001_0000001_0000 |
| │  └── bucket_00000 |
| ├── delta_0000002_0000002_0000 |
| │  └── bucket_00000 |
| └── delta_0000003_0000003_0000 |
| └── bucket_00000 |
| |
| 6 directories, 9 files |
| */ |
| rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by a,b"); |
| LOG.warn("after compact"); |
| for(String s : rs) { |
| LOG.warn(s); |
| } |
| Assert.assertEquals("Unexpected row count after compaction", expected.length, rs.size()); |
| for(int i = 0; i < expected.length; i++) { |
| Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); |
| Assert.assertTrue("Actual line(file) " + i + " ac: " + rs.get(i), rs.get(i).endsWith(expected[i][1])); |
| } |
| //make sure they are the same before and after compaction |
| } |
| /** |
| * Currently CTAS doesn't support bucketed tables. Correspondingly Acid only supports CTAS for |
| * unbucketed tables. This test is here to make sure that if CTAS is made to support unbucketed |
| * tables, that it raises a red flag for Acid. |
| */ |
| @Test |
| public void testCtasBucketed() throws Exception { |
| runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(1,2),(1,3)"); |
| CommandProcessorException e = runStatementOnDriverNegative("create table myctas " + |
| "clustered by (a) into 2 buckets stored as ORC TBLPROPERTIES ('transactional'='true') as " + |
| "select a, b from " + Table.NONACIDORCTBL); |
| ErrorMsg.CTAS_PARCOL_COEXISTENCE.getErrorCode(); //this code doesn't propagate |
| // Assert.assertEquals("Wrong msg", ErrorMsg.CTAS_PARCOL_COEXISTENCE.getErrorCode(), cpr.getErrorCode()); |
| Assert.assertTrue(e.getMessage().contains("CREATE-TABLE-AS-SELECT does not support")); |
| } |
| /** |
| * Currently CTAS doesn't support partitioned tables. Correspondingly Acid only supports CTAS for |
| * un-partitioned tables. This test is here to make sure that if CTAS is made to support |
| * un-partitioned tables, that it raises a red flag for Acid. |
| */ |
| @Test |
| public void testCtasPartitioned() throws Exception { |
| runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(1,2),(1,3)"); |
| CommandProcessorException e = runStatementOnDriverNegative("create table myctas partitioned " + |
| "by (b int) stored as " + |
| "ORC TBLPROPERTIES ('transactional'='true') as select a, b from " + Table.NONACIDORCTBL); |
| ErrorMsg.CTAS_PARCOL_COEXISTENCE.getErrorCode(); //this code doesn't propagate |
| Assert.assertTrue(e.getMessage().contains("CREATE-TABLE-AS-SELECT does not support " + |
| "partitioning in the target table")); |
| } |
| /** |
| * Tests to check that we are able to use vectorized acid reader, |
| * VectorizedOrcAcidRowBatchReader, when reading "original" files, |
| * i.e. those that were written before the table was converted to acid. |
| * See also acid_vectorization_original*.q |
| */ |
| @Test |
| public void testNonAcidToAcidVectorzied() throws Exception { |
| hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); |
| hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); |
| //this enables vectorization of ROW__ID |
| hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ROW_IDENTIFIER_ENABLED, true);//HIVE-12631 |
| runStatementOnDriver("drop table if exists T"); |
| runStatementOnDriver("create table T(a int, b int) stored as orc tblproperties('transactional'='false')"); |
| int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}}; |
| runStatementOnDriver("insert into T(a, b) " + makeValuesClause(values)); |
| //, 'transactional_properties'='default' |
| runStatementOnDriver("alter table T SET TBLPROPERTIES ('transactional'='true')"); |
| //Execution mode: vectorized |
| //this uses VectorizedOrcAcidRowBatchReader |
| String query = "select a from T where b > 6 order by a"; |
| List<String> rs = runStatementOnDriver(query); |
| String[][] expected = { |
| {"6", ""}, |
| {"9", ""}, |
| }; |
| checkExpected(rs, expected, "After conversion"); |
| Assert.assertEquals(Integer.toString(6), rs.get(0)); |
| Assert.assertEquals(Integer.toString(9), rs.get(1)); |
| assertVectorized(shouldVectorize(), query); |
| |
| //why isn't PPD working.... - it is working but storage layer doesn't do row level filtering; only row group level |
| //this uses VectorizedOrcAcidRowBatchReader |
| query = "select ROW__ID, a from T where b > 6 order by a"; |
| rs = runStatementOnDriver(query); |
| String[][] expected1 = { |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}", "6"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}", "9"} |
| }; |
| checkExpected(rs, expected1, "After conversion with VC1"); |
| assertVectorized(shouldVectorize(), query); |
| |
| //this uses VectorizedOrcAcidRowBatchReader |
| query = "select ROW__ID, a from T where b > 0 order by a"; |
| rs = runStatementOnDriver(query); |
| String[][] expected2 = { |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}", "1"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}", "2"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}", "5"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}", "6"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}", "9"} |
| }; |
| checkExpected(rs, expected2, "After conversion with VC2"); |
| assertVectorized(shouldVectorize(), query); |
| |
| //doesn't vectorize (uses neither of the Vectorzied Acid readers) |
| query = "select ROW__ID, a, INPUT__FILE__NAME from T where b > 6 order by a"; |
| rs = runStatementOnDriver(query); |
| Assert.assertEquals("", 2, rs.size()); |
| String[][] expected3 = { |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t6", "warehouse/t/000000_0"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t9", "warehouse/t/000000_0"} |
| }; |
| checkExpected(rs, expected3, "After non-vectorized read"); |
| Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912)); |
| //vectorized because there is INPUT__FILE__NAME |
| assertVectorized(false, query); |
| |
| runStatementOnDriver("update T set b = 17 where a = 1"); |
| //this should use VectorizedOrcAcidRowReader |
| query = "select ROW__ID, b from T where b > 0 order by a"; |
| rs = runStatementOnDriver(query); |
| String[][] expected4 = { |
| {"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":0}","17"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}","4"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}","6"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}","8"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}","10"} |
| }; |
| checkExpected(rs, expected4, "After conversion with VC4"); |
| assertVectorized(shouldVectorize(), query); |
| |
| runStatementOnDriver("alter table T compact 'major'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); |
| ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); |
| Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); |
| Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); |
| Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local")); |
| |
| //this should not vectorize at all |
| query = "select ROW__ID, a, b, INPUT__FILE__NAME from T where b > 0 order by a, b"; |
| rs = runStatementOnDriver(query); |
| String[][] expected5 = {//the row__ids are the same after compaction |
| {"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":0}\t1\t17", |
| "warehouse/t/base_10000001_v0000028/bucket_00000"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t2\t4", |
| "warehouse/t/base_10000001_v0000028/bucket_00000"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", |
| "warehouse/t/base_10000001_v0000028/bucket_00000"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t6\t8", |
| "warehouse/t/base_10000001_v0000028/bucket_00000"}, |
| {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t9\t10", |
| "warehouse/t/base_10000001_v0000028/bucket_00000"} |
| }; |
| checkExpected(rs, expected5, "After major compaction"); |
| //vectorized because there is INPUT__FILE__NAME |
| assertVectorized(false, query); |
| } |
| private void checkExpected(List<String> rs, String[][] expected, String msg) { |
| super.checkExpected(rs, expected, msg, LOG, true); |
| } |
| /** |
| * HIVE-17900 |
| */ |
| @Test |
| public void testCompactStatsGather() throws Exception { |
| hiveConf.setIntVar(HiveConf.ConfVars.HIVEOPTSORTDYNAMICPARTITIONTHRESHOLD, -1); |
| runStatementOnDriver("drop table if exists T"); |
| runStatementOnDriver("create table T(a int, b int) partitioned by (p int, q int) " + |
| "stored as orc TBLPROPERTIES ('transactional'='true')"); |
| |
| int[][] targetVals = {{4, 1, 1}, {4, 2, 2}, {4, 3, 1}, {4, 4, 2}}; |
| //we only recompute stats after major compact if they existed before |
| runStatementOnDriver("insert into T partition(p=1,q) " + makeValuesClause(targetVals)); |
| runStatementOnDriver("analyze table T partition(p=1) compute statistics for columns"); |
| |
| IMetaStoreClient hms = Hive.get().getMSC(); |
| List<String> partNames = new ArrayList<>(); |
| partNames.add("p=1/q=2"); |
| List<String> colNames = new ArrayList<>(); |
| colNames.add("a"); |
| Map<String, List<ColumnStatisticsObj>> map = hms.getPartitionColumnStatistics("default", |
| "T", partNames, colNames, Constants.HIVE_ENGINE); |
| Assert.assertEquals(4, map.get(partNames.get(0)).get(0).getStatsData().getLongStats().getHighValue()); |
| |
| |
| int[][] targetVals2 = {{5, 1, 1}, {5, 2, 2}, {5, 3, 1}, {5, 4, 2}}; |
| runStatementOnDriver("insert into T partition(p=1,q) " + makeValuesClause(targetVals2)); |
| |
| String query = "select ROW__ID, p, q, a, b, INPUT__FILE__NAME from T order by p, q, a, b"; |
| List<String> rs = runStatementOnDriver(query); |
| String[][] expected = { |
| {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t4\t1", "t/p=1/q=1/delta_0000001_0000001_0000/bucket_00000_0"}, |
| {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4\t3", "t/p=1/q=1/delta_0000001_0000001_0000/bucket_00000_0"}, |
| {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t5\t1", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000_0"}, |
| {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t5\t3", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000_0"}, |
| {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t4\t2", "t/p=1/q=2/delta_0000001_0000001_0000/bucket_00000_0"}, |
| {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4\t4", "t/p=1/q=2/delta_0000001_0000001_0000/bucket_00000_0"}, |
| {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t5\t2", "t/p=1/q=2/delta_0000003_0000003_0000/bucket_00000_0"}, |
| {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t5\t4", "t/p=1/q=2/delta_0000003_0000003_0000/bucket_00000_0"} |
| }; |
| checkExpected(rs, expected, "insert data"); |
| |
| //run major compaction |
| runStatementOnDriver("alter table T partition(p=1,q=2) compact 'major'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| |
| query = "select ROW__ID, p, q, a, b, INPUT__FILE__NAME from T order by p, q, a, b"; |
| rs = runStatementOnDriver(query); |
| String[][] expected2 = { |
| {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t4\t1", "t/p=1/q=1/delta_0000001_0000001_0000/bucket_00000_0"}, |
| {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4\t3", "t/p=1/q=1/delta_0000001_0000001_0000/bucket_00000_0"}, |
| {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t5\t1", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000_0"}, |
| {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t5\t3", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000_0"}, |
| {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t4\t2", "t/p=1/q=2/base_0000003_v0000019/bucket_00000"}, |
| {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4\t4", "t/p=1/q=2/base_0000003_v0000019/bucket_00000"}, |
| {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t5\t2", "t/p=1/q=2/base_0000003_v0000019/bucket_00000"}, |
| {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t5\t4", "t/p=1/q=2/base_0000003_v0000019/bucket_00000"} |
| }; |
| checkExpected(rs, expected2, "after major compaction"); |
| |
| //check status of compaction job |
| TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); |
| ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); |
| Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); |
| Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); |
| Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local")); |
| |
| //now check that stats were updated |
| map = hms.getPartitionColumnStatistics("default","T", partNames, colNames, Constants.HIVE_ENGINE); |
| Assert.assertEquals("", 5, map.get(partNames.get(0)).get(0).getStatsData().getLongStats().getHighValue()); |
| } |
| @Test |
| public void testDefault() throws Exception { |
| hiveConf.set(MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID.getVarname(), "true"); |
| runStatementOnDriver("drop table if exists T"); |
| runStatementOnDriver("create table T (a int, b int) stored as orc"); |
| runStatementOnDriver("insert into T values(1,2),(3,4)"); |
| String query = "select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b"; |
| List<String> rs = runStatementOnDriver(query); |
| String[][] expected = { |
| //this proves data is written in Acid layout so T was made Acid |
| {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000_0"}, |
| {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000_0"} |
| }; |
| checkExpected(rs, expected, "insert data"); |
| } |
| /** |
| * see HIVE-18429 |
| */ |
| @Test |
| public void testEmptyCompactionResult() throws Exception { |
| hiveConf.set(MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID.getVarname(), "true"); |
| runStatementOnDriver("drop table if exists T"); |
| runStatementOnDriver("create table T (a int, b int) stored as orc"); |
| int[][] data = {{1,2}, {3,4}}; |
| runStatementOnDriver("insert into T" + makeValuesClause(data)); |
| runStatementOnDriver("insert into T" + makeValuesClause(data)); |
| |
| //delete the bucket files so now we have empty delta dirs |
| List<String> rs = runStatementOnDriver("select distinct INPUT__FILE__NAME from T"); |
| FileSystem fs = FileSystem.get(hiveConf); |
| for(String path : rs) { |
| fs.delete(new Path(path), true); |
| } |
| runStatementOnDriver("alter table T compact 'major'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| |
| //check status of compaction job |
| TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); |
| ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); |
| Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); |
| Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); |
| Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local")); |
| |
| //now run another compaction make sure empty dirs don't cause issues |
| runStatementOnDriver("insert into T" + makeValuesClause(data)); |
| runStatementOnDriver("alter table T compact 'major'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| |
| //check status of compaction job |
| resp = txnHandler.showCompact(new ShowCompactRequest()); |
| Assert.assertEquals("Unexpected number of compactions in history", 2, resp.getCompactsSize()); |
| for(int i = 0; i < 2; i++) { |
| Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(i).getState()); |
| Assert.assertTrue(resp.getCompacts().get(i).getHadoopJobId().startsWith("job_local")); |
| } |
| rs = runStatementOnDriver("select a, b from T order by a, b"); |
| Assert.assertEquals(stringifyValues(data), rs); |
| |
| } |
| } |
| |