| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hive.ql; |
| |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Timer; |
| import java.util.TimerTask; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.regex.Pattern; |
| import java.util.stream.Collectors; |
| |
| import org.apache.hadoop.hive.common.FileUtils; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocatedFileStatus; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.RemoteIterator; |
| import org.apache.hadoop.hive.conf.Constants; |
| import org.apache.hadoop.hive.conf.HiveConf; |
| import org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl; |
| import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; |
| import org.apache.hadoop.hive.metastore.IMetaStoreClient; |
| import org.apache.hadoop.hive.metastore.MetastoreTaskThread; |
| import org.apache.hadoop.hive.metastore.MetaStoreFilterHook; |
| import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; |
| import org.apache.hadoop.hive.metastore.api.CompactionType; |
| import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; |
| import org.apache.hadoop.hive.metastore.api.LockState; |
| import org.apache.hadoop.hive.metastore.api.LockType; |
| import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; |
| import org.apache.hadoop.hive.metastore.api.MetaException; |
| import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; |
| import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; |
| import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; |
| import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; |
| import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; |
| import org.apache.hadoop.hive.metastore.api.TxnInfo; |
| import org.apache.hadoop.hive.metastore.api.TxnState; |
| import org.apache.hadoop.hive.metastore.conf.MetastoreConf; |
| import org.apache.hadoop.hive.metastore.txn.service.AcidHouseKeeperService; |
| import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; |
| import org.apache.hadoop.hive.metastore.txn.TxnStore; |
| import org.apache.hadoop.hive.metastore.txn.TxnUtils; |
| import org.apache.hadoop.hive.ql.io.AcidOutputFormat; |
| import org.apache.hadoop.hive.ql.io.AcidUtils; |
| import org.apache.hadoop.hive.ql.io.BucketCodec; |
| import org.apache.hadoop.hive.ql.metadata.Hive; |
| import org.apache.hadoop.hive.ql.metadata.HiveException; |
| import org.apache.hadoop.hive.ql.processors.CommandProcessorException; |
| import org.apache.hadoop.hive.ql.session.SessionState; |
| import org.apache.hadoop.hive.ql.txn.compactor.CompactorTestUtilities; |
| import org.apache.thrift.TException; |
| import org.junit.Assert; |
| import org.junit.Ignore; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.collect.Lists; |
| |
| import static java.util.Arrays.asList; |
| import static org.apache.commons.collections.CollectionUtils.isEqualCollection; |
| import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE_PATTERN; |
| |
| /** |
| * The LockManager is not ready, but for no-concurrency straight-line path we can |
| * test AC=true, and AC=false with commit/rollback/exception and test resulting data. |
| * |
| * Can also test, calling commit in AC=true mode, etc, toggling AC... |
| * |
| * Tests here are for multi-statement transactions (WIP) and others |
| * Mostly uses bucketed tables |
| */ |
| public class TestTxnCommands extends TxnCommandsBaseForTests { |
| static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands.class); |
| private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + |
| File.separator + TestTxnCommands.class.getCanonicalName() + "-" + System.currentTimeMillis() |
| ).getPath().replaceAll("\\\\", "/"); |
| |
| @Rule |
| public ExpectedException expectedException = ExpectedException.none(); |
| @Override |
| protected String getTestDataDir() { |
| return TEST_DATA_DIR; |
| } |
| |
| @Override |
| void initHiveConf() { |
| super.initHiveConf(); |
| //TestTxnCommandsWithSplitUpdateAndVectorization has the vectorized version |
| //of these tests. |
| HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); |
| HiveConf.setVar(hiveConf, HiveConf.ConfVars.DYNAMIC_PARTITIONING_MODE, "nonstrict"); |
| HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_DROP_PARTITION_USE_BASE, false); |
| HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_RENAME_PARTITION_MAKE_COPY, false); |
| HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, false); |
| HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_TRUNCATE_USE_BASE, false); |
| |
| MetastoreConf.setClass(hiveConf, MetastoreConf.ConfVars.FILTER_HOOK, |
| DummyMetaStoreFilterHookImpl.class, MetaStoreFilterHook.class); |
| |
| HiveConf.setVar(hiveConf, HiveConf.ConfVars.HIVE_METASTORE_WAREHOUSE_EXTERNAL, |
| new Path(getWarehouseDir(), "ext").toUri().getPath()); |
| } |
| |
| public static class DummyMetaStoreFilterHookImpl extends DefaultMetaStoreFilterHookImpl { |
| private static boolean blockResults = false; |
| |
| public DummyMetaStoreFilterHookImpl(Configuration conf) { |
| super(conf); |
| } |
| @Override |
| public List<String> filterTableNames(String catName, String dbName, List<String> tableList) { |
| if (blockResults) { |
| return new ArrayList<>(); |
| } |
| return tableList; |
| } |
| } |
| |
| /** |
| * tests that a failing Insert Overwrite (which creates a new base_x) is properly marked as |
| * aborted. |
| */ |
| @Test |
| public void testInsertOverwrite() throws Exception { |
| runStatementOnDriver("insert overwrite table " + Table.NONACIDORCTBL + " select a,b from " + Table.NONACIDORCTBL2); |
| runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "3(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); |
| runStatementOnDriver("insert into " + Table.ACIDTBL + " values(1,2)"); |
| List<String> rs = runStatementOnDriver("select a from " + Table.ACIDTBL + " where b = 2"); |
| Assert.assertEquals(1, rs.size()); |
| Assert.assertEquals("1", rs.get(0)); |
| hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_TEST_MODE_ROLLBACK_TXN, true); |
| runStatementOnDriver("insert overwrite table " + Table.ACIDTBL + " values(3,2)"); |
| hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_TEST_MODE_ROLLBACK_TXN, false); |
| runStatementOnDriver("insert into " + Table.ACIDTBL + " values(5,6)"); |
| rs = runStatementOnDriver("select a from " + Table.ACIDTBL + " order by a"); |
| Assert.assertEquals(2, rs.size()); |
| Assert.assertEquals("1", rs.get(0)); |
| Assert.assertEquals("5", rs.get(1)); |
| } |
| |
| @Ignore("not needed but useful for testing") |
| @Test |
| public void testNonAcidInsert() throws Exception { |
| runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); |
| List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); |
| runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(2,3)"); |
| List<String> rs1 = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); |
| } |
| |
| /** |
| * Useful for debugging. Dumps ORC file in JSON to CWD. |
| */ |
| private void dumpBucketData(Table table, long writeId, int stmtId, int bucketNum) throws Exception { |
| if(true) { |
| return; |
| } |
| Path bucket = AcidUtils.createBucketFile(new Path(new Path(getWarehouseDir(), table.toString().toLowerCase()), AcidUtils.deltaSubdir(writeId, writeId, stmtId)), bucketNum); |
| FileOutputStream delta = new FileOutputStream(testName.getMethodName() + "_" + bucket.getParent().getName() + "_" + bucket.getName()); |
| // try { |
| // FileDump.printJsonData(conf, bucket.toString(), delta); |
| // } |
| // catch(FileNotFoundException ex) { |
| // ; //this happens if you change BUCKET_COUNT |
| // } |
| delta.close(); |
| } |
| /** |
| * Dump all data in the table by bucket in JSON format |
| */ |
| private void dumpTableData(Table table, long writeId, int stmtId) throws Exception { |
| for(int bucketNum = 0; bucketNum < BUCKET_COUNT; bucketNum++) { |
| dumpBucketData(table, writeId, stmtId, bucketNum); |
| } |
| } |
| @Test |
| public void testSimpleAcidInsert() throws Exception { |
| int[][] rows1 = {{1,2},{3,4}}; |
| runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1)); |
| //List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| //Assert.assertEquals("Data didn't match in autocommit=true (rs)", stringifyValues(rows1), rs); |
| runStatementOnDriver("START TRANSACTION"); |
| int[][] rows2 = {{5,6},{7,8}}; |
| runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2)); |
| List<String> allData = stringifyValues(rows1); |
| allData.addAll(stringifyValues(rows2)); |
| List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Data didn't match inside tx (rs0)", allData, rs0); |
| runStatementOnDriver("COMMIT WORK"); |
| dumpTableData(Table.ACIDTBL, 1, 0); |
| dumpTableData(Table.ACIDTBL, 2, 0); |
| runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| CommandProcessorException e = runStatementOnDriverNegative("COMMIT"); //txn started implicitly by previous statement |
| Assert.assertEquals("Error didn't match: " + e, |
| ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN.getErrorCode(), e.getErrorCode()); |
| List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Data didn't match inside tx (rs0)", allData, rs1); |
| } |
| |
| @Test |
| public void testMmExim() throws Exception { |
| String tableName = "mm_table", importName = tableName + "_import"; |
| runStatementOnDriver("drop table if exists " + tableName); |
| runStatementOnDriver(String.format("create table %s (a int, b int) stored as orc " + |
| "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')", |
| tableName)); |
| |
| // Regular insert: export some MM deltas, then import into a new table. |
| int[][] rows1 = {{1,2},{3,4}}; |
| runStatementOnDriver(String.format("insert into %s (a,b) %s", |
| tableName, makeValuesClause(rows1))); |
| runStatementOnDriver(String.format("insert into %s (a,b) %s", |
| tableName, makeValuesClause(rows1))); |
| IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); |
| org.apache.hadoop.hive.metastore.api.Table table = msClient.getTable("default", tableName); |
| FileSystem fs = FileSystem.get(hiveConf); |
| Path exportPath = new Path(table.getSd().getLocation() + "_export"); |
| fs.delete(exportPath, true); |
| runStatementOnDriver(String.format("export table %s to '%s'", tableName, exportPath)); |
| List<String> paths = listPathsRecursive(fs, exportPath); |
| verifyMmExportPaths(paths, 2); |
| runStatementOnDriver(String.format("import table %s from '%s'", importName, exportPath)); |
| org.apache.hadoop.hive.metastore.api.Table imported = msClient.getTable("default", importName); |
| Assert.assertEquals(imported.toString(), "insert_only", |
| imported.getParameters().get("transactional_properties")); |
| Path importPath = new Path(imported.getSd().getLocation()); |
| FileStatus[] stat = fs.listStatus(importPath, FileUtils.HIDDEN_FILES_PATH_FILTER); |
| Assert.assertEquals(Arrays.toString(stat), 1, stat.length); |
| assertIsDelta(stat[0]); |
| List<String> allData = stringifyValues(rows1); |
| allData.addAll(stringifyValues(rows1)); |
| allData.sort(null); |
| Collections.sort(allData); |
| List<String> rs = runStatementOnDriver( |
| String.format("select a,b from %s order by a,b", importName)); |
| Assert.assertEquals("After import: " + rs, allData, rs); |
| runStatementOnDriver("drop table if exists " + importName); |
| |
| // Do insert overwrite to create some invalid deltas, and import into a non-MM table. |
| int[][] rows2 = {{5,6},{7,8}}; |
| runStatementOnDriver(String.format("insert overwrite table %s %s", |
| tableName, makeValuesClause(rows2))); |
| fs.delete(exportPath, true); |
| runStatementOnDriver(String.format("export table %s to '%s'", tableName, exportPath)); |
| paths = listPathsRecursive(fs, exportPath); |
| verifyMmExportPaths(paths, 1); |
| runStatementOnDriver(String.format("create table %s (a int, b int) stored as orc " + |
| "TBLPROPERTIES ('transactional'='false')", importName)); |
| runStatementOnDriver(String.format("import table %s from '%s'", importName, exportPath)); |
| imported = msClient.getTable("default", importName); |
| Assert.assertNull(imported.toString(), imported.getParameters().get("transactional")); |
| Assert.assertNull(imported.toString(), |
| imported.getParameters().get("transactional_properties")); |
| importPath = new Path(imported.getSd().getLocation()); |
| stat = fs.listStatus(importPath, FileUtils.HIDDEN_FILES_PATH_FILTER); |
| allData = stringifyValues(rows2); |
| Collections.sort(allData); |
| rs = runStatementOnDriver(String.format("select a,b from %s order by a,b", importName)); |
| Assert.assertEquals("After import: " + rs, allData, rs); |
| runStatementOnDriver("drop table if exists " + importName); |
| runStatementOnDriver("drop table if exists " + tableName); |
| msClient.close(); |
| } |
| |
| private static final class QueryRunnable implements Runnable { |
| private final CountDownLatch cdlIn, cdlOut; |
| private final String query; |
| private final HiveConf hiveConf; |
| |
| QueryRunnable(HiveConf hiveConf, String query, CountDownLatch cdlIn, CountDownLatch cdlOut) { |
| this.query = query; |
| this.cdlIn = cdlIn; |
| this.cdlOut = cdlOut; |
| this.hiveConf = new HiveConf(hiveConf); |
| } |
| |
| @Override |
| public void run() { |
| SessionState ss = SessionState.start(hiveConf); |
| try { |
| ss.applyAuthorizationPolicy(); |
| } catch (HiveException e) { |
| throw new RuntimeException(e); |
| } |
| QueryState qs = new QueryState.Builder().withHiveConf(hiveConf).nonIsolated().build(); |
| try (Driver d = new Driver(qs)) { |
| LOG.info("Ready to run the query: " + query); |
| syncThreadStart(cdlIn, cdlOut); |
| try { |
| try { |
| d.run(query); |
| } catch (CommandProcessorException e) { |
| throw new RuntimeException(query + " failed: " + e); |
| } |
| d.getResults(new ArrayList<String>()); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| } |
| |
| |
| private static void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) { |
| cdlIn.countDown(); |
| try { |
| cdlOut.await(); |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Test |
| public void testParallelInsertStats() throws Exception { |
| final int TASK_COUNT = 4; |
| String tableName = "mm_table"; |
| List<ColumnStatisticsObj> stats; |
| IMetaStoreClient msClient = prepareParallelTest(tableName, 0); |
| |
| String[] queries = new String[TASK_COUNT]; |
| for (int i = 0; i < queries.length; ++i) { |
| queries[i] = String.format("insert into %s (a) values (" + i + ")", tableName); |
| } |
| |
| runParallelQueries(queries); |
| |
| // Verify stats are either invalid, or valid and correct. |
| stats = getTxnTableStats(msClient, tableName); |
| boolean hasStats = 0 != stats.size(); |
| if (hasStats) { |
| verifyLongStats(TASK_COUNT, 0, TASK_COUNT - 1, stats); |
| } |
| |
| runStatementOnDriver(String.format("insert into %s (a) values (" + TASK_COUNT + ")", tableName)); |
| if (!hasStats) { |
| // Stats should still be invalid if they were invalid. |
| stats = getTxnTableStats(msClient, tableName); |
| Assert.assertEquals(0, stats.size()); |
| } |
| |
| // Stats should be valid after analyze. |
| runStatementOnDriver(String.format("analyze table %s compute statistics for columns", tableName)); |
| verifyLongStats(TASK_COUNT + 1, 0, TASK_COUNT, getTxnTableStats(msClient, tableName)); |
| } |
| |
| private void verifyLongStats(int dvCount, int min, int max, List<ColumnStatisticsObj> stats) { |
| Assert.assertEquals(1, stats.size()); |
| LongColumnStatsData data = stats.get(0).getStatsData().getLongStats(); |
| Assert.assertEquals(min, data.getLowValue()); |
| Assert.assertEquals(max, data.getHighValue()); |
| Assert.assertEquals(dvCount, data.getNumDVs()); |
| } |
| |
| private void runParallelQueries(String[] queries) |
| throws InterruptedException, ExecutionException { |
| ExecutorService executor = Executors.newFixedThreadPool(queries.length); |
| final CountDownLatch cdlIn = new CountDownLatch(queries.length), cdlOut = new CountDownLatch(1); |
| Future<?>[] tasks = new Future[queries.length]; |
| for (int i = 0; i < tasks.length; ++i) { |
| tasks[i] = executor.submit(new QueryRunnable(hiveConf, queries[i], cdlIn, cdlOut)); |
| } |
| cdlIn.await(); // Wait for all threads to be ready. |
| cdlOut.countDown(); // Release them at the same time. |
| for (int i = 0; i < tasks.length; ++i) { |
| tasks[i].get(); |
| } |
| } |
| |
| private IMetaStoreClient prepareParallelTest(String tableName, int val) |
| throws Exception, MetaException, TException, NoSuchObjectException { |
| hiveConf.setBoolean("hive.stats.autogather", true); |
| hiveConf.setBoolean("hive.stats.column.autogather", true); |
| // Need to close the thread local Hive object so that configuration change is reflected to HMS. |
| Hive.closeCurrent(); |
| runStatementOnDriver("drop table if exists " + tableName); |
| runStatementOnDriver(String.format("create table %s (a int) stored as orc " + |
| "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')", |
| tableName)); |
| runStatementOnDriver(String.format("insert into %s (a) values (" + val + ")", tableName)); |
| runStatementOnDriver(String.format("insert into %s (a) values (" + val + ")", tableName)); |
| IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); |
| // Stats should be valid after serial inserts. |
| List<ColumnStatisticsObj> stats = getTxnTableStats(msClient, tableName); |
| Assert.assertEquals(1, stats.size()); |
| return msClient; |
| } |
| |
| @Test |
| public void testAddAndDropConstraintAdvancingWriteIds() throws Exception { |
| |
| String tableName = "constraints_table"; |
| hiveConf.setBoolean("hive.stats.autogather", true); |
| hiveConf.setBoolean("hive.stats.column.autogather", true); |
| // Need to close the thread local Hive object so that configuration change is reflected to HMS. |
| Hive.closeCurrent(); |
| runStatementOnDriver("drop table if exists " + tableName); |
| runStatementOnDriver(String.format("create table %s (a int, b string) stored as orc " + |
| "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')", |
| tableName)); |
| runStatementOnDriver(String.format("insert into %s (a) values (0)", tableName)); |
| IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); |
| String validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); |
| LOG.info("ValidWriteIds before add constraint::"+ validWriteIds); |
| Assert.assertEquals("default.constraints_table:1:9223372036854775807::", validWriteIds); |
| runStatementOnDriver(String.format("alter table %s ADD CONSTRAINT a_PK PRIMARY KEY (`a`) DISABLE NOVALIDATE", tableName)); |
| validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); |
| LOG.info("ValidWriteIds after add constraint primary key::"+ validWriteIds); |
| Assert.assertEquals("default.constraints_table:2:9223372036854775807::", validWriteIds); |
| runStatementOnDriver(String.format("alter table %s CHANGE COLUMN b b STRING NOT NULL", tableName)); |
| validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); |
| LOG.info("ValidWriteIds after add constraint not null::"+ validWriteIds); |
| Assert.assertEquals("default.constraints_table:3:9223372036854775807::", validWriteIds); |
| runStatementOnDriver(String.format("alter table %s ADD CONSTRAINT check1 CHECK (a <= 25)", tableName)); |
| validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); |
| LOG.info("ValidWriteIds after add constraint check::"+ validWriteIds); |
| Assert.assertEquals("default.constraints_table:4:9223372036854775807::", validWriteIds); |
| runStatementOnDriver(String.format("alter table %s ADD CONSTRAINT unique1 UNIQUE (a, b) DISABLE", tableName)); |
| validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); |
| LOG.info("ValidWriteIds after add constraint unique::"+ validWriteIds); |
| Assert.assertEquals("default.constraints_table:5:9223372036854775807::", validWriteIds); |
| |
| LOG.info("ValidWriteIds before drop constraint::"+ validWriteIds); |
| runStatementOnDriver(String.format("alter table %s DROP CONSTRAINT a_PK", tableName)); |
| validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); |
| Assert.assertEquals("default.constraints_table:6:9223372036854775807::", validWriteIds); |
| LOG.info("ValidWriteIds after drop constraint primary key::"+ validWriteIds); |
| runStatementOnDriver(String.format("alter table %s DROP CONSTRAINT check1", tableName)); |
| validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); |
| Assert.assertEquals("default.constraints_table:7:9223372036854775807::", validWriteIds); |
| LOG.info("ValidWriteIds after drop constraint check::"+ validWriteIds); |
| runStatementOnDriver(String.format("alter table %s DROP CONSTRAINT unique1", tableName)); |
| validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); |
| Assert.assertEquals("default.constraints_table:8:9223372036854775807::", validWriteIds); |
| LOG.info("ValidWriteIds after drop constraint unique::"+ validWriteIds); |
| runStatementOnDriver(String.format("alter table %s CHANGE COLUMN b b STRING", tableName)); |
| validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); |
| Assert.assertEquals("default.constraints_table:9:9223372036854775807::", validWriteIds); |
| |
| } |
| |
| /** |
| * If you are disabling or removing this test case, it probably means now we support exchange partition for |
| * transactional tables. If that is the case, we also have to make sure we advance the Write IDs during exchange |
| * partition DDL for transactional tables. You can look at https://github.com/apache/hive/pull/2465 as an example. |
| * @throws Exception |
| */ |
| @Test |
| public void exchangePartitionShouldNotWorkForTransactionalTables() throws Exception { |
| runStatementOnDriver("create database IF NOT EXISTS db1"); |
| runStatementOnDriver("create database IF NOT EXISTS db2"); |
| |
| runStatementOnDriver("CREATE TABLE db1.exchange_part_test1 (f1 string) PARTITIONED BY (ds STRING)"); |
| |
| String tableName = "db2.exchange_part_test2"; |
| runStatementOnDriver(String.format("CREATE TABLE %s (f1 string) PARTITIONED BY (ds STRING) " + |
| "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')" |
| ,tableName)); |
| |
| runStatementOnDriver("ALTER TABLE db2.exchange_part_test2 ADD PARTITION (ds='2013-04-05')"); |
| |
| try { |
| runStatementOnDriver("ALTER TABLE db1.exchange_part_test1 EXCHANGE PARTITION (ds='2013-04-05') " + |
| "WITH TABLE db2.exchange_part_test2"); |
| Assert.fail("Exchange partition should not be allowed for transaction tables" ); |
| }catch(Exception e) { |
| Assert.assertTrue(e.getMessage().contains("Exchange partition is not allowed with transactional tables")); |
| } |
| } |
| |
| @Test |
| public void truncateTableAdvancingWriteId() throws Exception { |
| runStatementOnDriver("create database IF NOT EXISTS trunc_db"); |
| |
| String tableName = "trunc_db.trunc_table"; |
| IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); |
| |
| runStatementOnDriver(String.format("CREATE TABLE %s (f1 string) PARTITIONED BY (ds STRING) " + |
| "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')" |
| , tableName)); |
| |
| String validWriteIds = msClient.getValidWriteIds(tableName).toString(); |
| LOG.info("ValidWriteIds before truncate table::" + validWriteIds); |
| Assert.assertEquals("trunc_db.trunc_table:0:9223372036854775807::", validWriteIds); |
| |
| runStatementOnDriver("TRUNCATE TABLE trunc_db.trunc_table"); |
| validWriteIds = msClient.getValidWriteIds(tableName).toString(); |
| LOG.info("ValidWriteIds after truncate table::" + validWriteIds); |
| Assert.assertEquals("trunc_db.trunc_table:1:9223372036854775807::", validWriteIds); |
| |
| } |
| |
| @Test |
| public void testAddAndDropPartitionAdvancingWriteIds() throws Exception { |
| runStatementOnDriver("create database IF NOT EXISTS db1"); |
| |
| String tableName = "db1.add_drop_partition"; |
| IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); |
| |
| runStatementOnDriver(String.format("CREATE TABLE %s (f1 string) PARTITIONED BY (ds STRING) " + |
| "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')" |
| ,tableName)); |
| |
| String validWriteIds = msClient.getValidWriteIds(tableName).toString(); |
| LOG.info("ValidWriteIds before add partition::"+ validWriteIds); |
| Assert.assertEquals("db1.add_drop_partition:0:9223372036854775807::", validWriteIds); |
| validWriteIds = msClient.getValidWriteIds(tableName).toString(); |
| runStatementOnDriver("ALTER TABLE db1.add_drop_partition ADD PARTITION (ds='2013-04-05')"); |
| validWriteIds = msClient.getValidWriteIds(tableName).toString(); |
| LOG.info("ValidWriteIds after add partition::"+ validWriteIds); |
| Assert.assertEquals("db1.add_drop_partition:1:9223372036854775807::", validWriteIds); |
| runStatementOnDriver("ALTER TABLE db1.add_drop_partition DROP PARTITION (ds='2013-04-05')"); |
| validWriteIds = msClient.getValidWriteIds(tableName).toString(); |
| LOG.info("ValidWriteIds after drop partition::"+ validWriteIds); |
| Assert.assertEquals("db1.add_drop_partition:2:9223372036854775807::", validWriteIds); |
| |
| } |
| |
| @Test |
| public void testDDLsAdvancingWriteIds() throws Exception { |
| hiveConf.setBoolVar(HiveConf.ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, true); |
| |
| String tableName = "alter_table"; |
| runStatementOnDriver("drop table if exists " + tableName); |
| runStatementOnDriver(String.format("create table %s (a int, b string, c BIGINT, d INT) " + |
| "PARTITIONED BY (ds STRING)" + |
| "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')", |
| tableName)); |
| runStatementOnDriver(String.format("insert into %s (a) values (0)", tableName)); |
| IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); |
| String validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); |
| Assert.assertEquals("default.alter_table:1:9223372036854775807::", validWriteIds); |
| |
| runStatementOnDriver(String.format("alter table %s SET OWNER USER user_name", tableName)); |
| validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); |
| Assert.assertEquals("default.alter_table:2:9223372036854775807::", validWriteIds); |
| |
| runStatementOnDriver(String.format("alter table %s CLUSTERED BY(c) SORTED BY(d) INTO 32 BUCKETS", tableName)); |
| validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); |
| Assert.assertEquals("default.alter_table:3:9223372036854775807::", validWriteIds); |
| |
| runStatementOnDriver(String.format("ALTER TABLE %s ADD PARTITION (ds='2013-04-05')", tableName)); |
| validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); |
| Assert.assertEquals("default.alter_table:4:9223372036854775807::", validWriteIds); |
| |
| runStatementOnDriver(String.format("ALTER TABLE %s SET SERDEPROPERTIES ('field.delim'='\\u0001')", tableName)); |
| validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); |
| Assert.assertEquals("default.alter_table:5:9223372036854775807::", validWriteIds); |
| |
| runStatementOnDriver(String.format("ALTER TABLE %s PARTITION (ds='2013-04-05') SET FILEFORMAT PARQUET", tableName)); |
| validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); |
| Assert.assertEquals("default.alter_table:6:9223372036854775807::", validWriteIds); |
| |
| // We should not advance the Write ID during compaction, since it affects the performance of |
| // materialized views. So, below assertion ensures that we do not advance the write during compaction. |
| runStatementOnDriver(String.format("ALTER TABLE %s PARTITION (ds='2013-04-05') COMPACT 'minor'", tableName)); |
| validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); |
| Assert.assertEquals("default.alter_table:6:9223372036854775807::", validWriteIds); |
| |
| //Process the compaction request because otherwise the CONCATENATE (major compaction) command on the same table and |
| // partition would be refused. |
| runWorker(hiveConf); |
| runCleaner(hiveConf); |
| |
| runStatementOnDriver(String.format("ALTER TABLE %s PARTITION (ds='2013-04-05') CONCATENATE", tableName)); |
| validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); |
| Assert.assertEquals("default.alter_table:7:9223372036854775807::", validWriteIds); |
| |
| runStatementOnDriver(String.format("ALTER TABLE %s SKEWED BY (a) ON (1,2)", tableName)); |
| validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); |
| Assert.assertEquals("default.alter_table:8:9223372036854775807::", validWriteIds); |
| |
| runStatementOnDriver(String.format("ALTER TABLE %s SET SKEWED LOCATION (1='hdfs://127.0.0.1:8020/abcd/1')", |
| tableName)); |
| validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); |
| Assert.assertEquals("default.alter_table:9:9223372036854775807::", validWriteIds); |
| |
| runStatementOnDriver(String.format("ALTER TABLE %s NOT SKEWED", tableName)); |
| validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); |
| Assert.assertEquals("default.alter_table:10:9223372036854775807::", validWriteIds); |
| |
| runStatementOnDriver(String.format("ALTER TABLE %s UNSET SERDEPROPERTIES ('field.delim')", tableName)); |
| validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); |
| Assert.assertEquals("default.alter_table:11:9223372036854775807::", validWriteIds); |
| |
| } |
| |
| @Test |
| public void testParallelInsertAnalyzeStats() throws Exception { |
| String tableName = "mm_table"; |
| List<ColumnStatisticsObj> stats; |
| IMetaStoreClient msClient = prepareParallelTest(tableName, 0); |
| |
| String[] queries = { |
| String.format("insert into %s (a) values (999)", tableName), |
| String.format("analyze table %s compute statistics for columns", tableName) |
| }; |
| runParallelQueries(queries); |
| |
| // Verify stats are either invalid, or valid and correct. |
| stats = getTxnTableStats(msClient, tableName); |
| boolean hasStats = 0 != stats.size(); |
| if (hasStats) { |
| verifyLongStats(2, 0, 999, stats); |
| } |
| |
| runStatementOnDriver(String.format("insert into %s (a) values (1000)", tableName)); |
| if (!hasStats) { |
| // Stats should still be invalid if they were invalid. |
| stats = getTxnTableStats(msClient, tableName); |
| Assert.assertEquals(0, stats.size()); |
| } |
| |
| // Stats should be valid after analyze. |
| runStatementOnDriver(String.format("analyze table %s compute statistics for columns", tableName)); |
| verifyLongStats(3, 0, 1000, getTxnTableStats(msClient, tableName)); |
| } |
| |
| @Test |
| public void testParallelTruncateAnalyzeStats() throws Exception { |
| String tableName = "mm_table"; |
| List<ColumnStatisticsObj> stats; |
| IMetaStoreClient msClient = prepareParallelTest(tableName, 0); |
| |
| String[] queries = { |
| String.format("truncate table %s", tableName), |
| String.format("analyze table %s compute statistics for columns", tableName) |
| }; |
| runParallelQueries(queries); |
| |
| // Verify stats are either invalid, or valid and correct. |
| stats = getTxnTableStats(msClient, tableName); |
| boolean hasStats = 0 != stats.size(); |
| if (hasStats) { |
| // Either the truncate run before or the analyze |
| if (stats.get(0).getStatsData().getLongStats().getNumDVs() > 0) { |
| verifyLongStats(1, 0, 0, stats); |
| } else { |
| verifyLongStats(0, 0, 0, stats); |
| } |
| } |
| |
| // Stats should be valid after analyze. |
| runStatementOnDriver(String.format("analyze table %s compute statistics for columns", tableName)); |
| verifyLongStats(0, 0, 0, getTxnTableStats(msClient, tableName)); |
| } |
| |
| |
| @Test |
| public void testTxnStatsOnOff() throws Exception { |
| String tableName = "mm_table"; |
| hiveConf.setBoolean("hive.stats.autogather", true); |
| hiveConf.setBoolean("hive.stats.column.autogather", true); |
| // Need to close the thread local Hive object so that configuration change is reflected to HMS. |
| Hive.closeCurrent(); |
| runStatementOnDriver("drop table if exists " + tableName); |
| runStatementOnDriver(String.format("create table %s (a int) stored as orc " + |
| "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')", |
| tableName)); |
| |
| runStatementOnDriver(String.format("insert into %s (a) values (1)", tableName)); |
| IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); |
| List<ColumnStatisticsObj> stats = getTxnTableStats(msClient, tableName); |
| Assert.assertEquals(1, stats.size()); |
| runStatementOnDriver(String.format("insert into %s (a) values (1)", tableName)); |
| stats = getTxnTableStats(msClient, tableName); |
| Assert.assertEquals(1, stats.size()); |
| msClient.close(); |
| hiveConf.setBoolean(MetastoreConf.ConfVars.HIVE_TXN_STATS_ENABLED.getVarname(), false); |
| msClient = new HiveMetaStoreClient(hiveConf); |
| // Even though the stats are valid in metastore, txn stats are disabled. |
| stats = getTxnTableStats(msClient, tableName); |
| Assert.assertEquals(0, stats.size()); |
| msClient.close(); |
| hiveConf.setBoolean(MetastoreConf.ConfVars.HIVE_TXN_STATS_ENABLED.getVarname(), true); |
| msClient = new HiveMetaStoreClient(hiveConf); |
| stats = getTxnTableStats(msClient, tableName); |
| // Now the stats are visible again. |
| Assert.assertEquals(1, stats.size()); |
| msClient.close(); |
| hiveConf.setBoolean(MetastoreConf.ConfVars.HIVE_TXN_STATS_ENABLED.getVarname(), false); |
| // Need to close the thread local Hive object so that configuration change is reflected to HMS. |
| Hive.closeCurrent(); |
| // Running the query with stats disabled will cause stats in metastore itself to become invalid. |
| runStatementOnDriver(String.format("insert into %s (a) values (1)", tableName)); |
| hiveConf.setBoolean(MetastoreConf.ConfVars.HIVE_TXN_STATS_ENABLED.getVarname(), true); |
| msClient = new HiveMetaStoreClient(hiveConf); |
| stats = getTxnTableStats(msClient, tableName); |
| Assert.assertEquals(0, stats.size()); |
| msClient.close(); |
| } |
| |
| public List<ColumnStatisticsObj> getTxnTableStats(IMetaStoreClient msClient, |
| String tableName) throws TException, NoSuchObjectException, MetaException { |
| String validWriteIds; |
| List<ColumnStatisticsObj> stats; |
| validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); |
| stats = msClient.getTableColumnStatistics( |
| "default", tableName, Lists.newArrayList("a"), Constants.HIVE_ENGINE, validWriteIds); |
| return stats; |
| } |
| |
| private void assertIsDelta(FileStatus stat) { |
| Assert.assertTrue(stat.toString(), |
| stat.getPath().getName().startsWith(AcidUtils.DELTA_PREFIX)); |
| } |
| |
| private void verifyMmExportPaths(List<String> paths, int deltasOrBases) { |
| // 1 file, 1 dir for each, for now. Plus export "data" dir. |
| // This could be changed to a flat file list later. |
| Assert.assertEquals(paths.toString(), 2 * deltasOrBases + 1, paths.size()); |
| // No confusing directories in export. |
| for (String path : paths) { |
| Assert.assertFalse(path, path.startsWith(AcidUtils.DELTA_PREFIX)); |
| Assert.assertFalse(path, path.startsWith(AcidUtils.BASE_PREFIX)); |
| } |
| } |
| |
| private List<String> listPathsRecursive(FileSystem fs, Path path) throws IOException { |
| List<String> paths = new ArrayList<>(); |
| LinkedList<Path> queue = new LinkedList<>(); |
| queue.add(path); |
| while (!queue.isEmpty()) { |
| Path next = queue.pollFirst(); |
| FileStatus[] stats = fs.listStatus(next, FileUtils.HIDDEN_FILES_PATH_FILTER); |
| for (FileStatus stat : stats) { |
| Path child = stat.getPath(); |
| paths.add(child.toString()); |
| if (stat.isDirectory()) { |
| queue.add(child); |
| } |
| } |
| } |
| return paths; |
| } |
| |
| |
| /** |
| * add tests for all transitions - AC=t, AC=t, AC=f, commit (for example) |
| * @throws Exception |
| */ |
| @Test |
| public void testErrors() throws Exception { |
| runStatementOnDriver("start transaction"); |
| CommandProcessorException e1 = runStatementOnDriverNegative("create table foo(x int, y int)"); |
| Assert.assertEquals("Expected DDL to fail in an open txn", |
| ErrorMsg.OP_NOT_ALLOWED_IN_TXN.getErrorCode(), e1.getErrorCode()); |
| CommandProcessorException e2 = runStatementOnDriverNegative("update " + Table.ACIDTBL + " set a = 1 where b != 1"); |
| Assert.assertEquals("Expected update of bucket column to fail", |
| "FAILED: SemanticException [Error 10302]: Updating values of bucketing columns is not supported. Column a.", |
| e2.getMessage()); |
| Assert.assertEquals("Expected update of bucket column to fail", |
| ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE.getErrorCode(), e2.getErrorCode()); |
| CommandProcessorException e3 = runStatementOnDriverNegative("commit"); //not allowed in w/o tx |
| Assert.assertEquals("Error didn't match: " + e3, |
| ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN.getErrorCode(), e3.getErrorCode()); |
| CommandProcessorException e4 = runStatementOnDriverNegative("rollback"); //not allowed in w/o tx |
| Assert.assertEquals("Error didn't match: " + e4, |
| ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN.getErrorCode(), e4.getErrorCode()); |
| runStatementOnDriver("start transaction"); |
| CommandProcessorException e5 = runStatementOnDriverNegative("start transaction"); //not allowed in a tx |
| Assert.assertEquals("Expected start transaction to fail", |
| ErrorMsg.OP_NOT_ALLOWED_IN_TXN.getErrorCode(), e5.getErrorCode()); |
| runStatementOnDriver("start transaction"); //ok since previously opened txn was killed |
| runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); |
| List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Can't see my own write", 1, rs0.size()); |
| runStatementOnDriver("commit work"); |
| rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Can't see my own write", 1, rs0.size()); |
| } |
| |
| @Test |
| public void testReadMyOwnInsert() throws Exception { |
| runStatementOnDriver("START TRANSACTION"); |
| List<String> rs = runStatementOnDriver("select * from " + Table.ACIDTBL); |
| Assert.assertEquals("Expected empty " + Table.ACIDTBL, 0, rs.size()); |
| runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); |
| List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Can't see my own write", 1, rs0.size()); |
| runStatementOnDriver("commit"); |
| runStatementOnDriver("START TRANSACTION"); |
| List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| runStatementOnDriver("rollback work"); |
| Assert.assertEquals("Can't see write after commit", 1, rs1.size()); |
| } |
| @Test |
| public void testImplicitRollback() throws Exception { |
| runStatementOnDriver("START TRANSACTION"); |
| runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); |
| List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Can't see my own write", 1, rs0.size()); |
| //next command should produce an error |
| CommandProcessorException e = runStatementOnDriverNegative("select * from no_such_table"); |
| Assert.assertEquals("Txn didn't fail?", |
| "FAILED: SemanticException [Error 10001]: Line 1:14 Table not found 'no_such_table'", |
| e.getMessage()); |
| runStatementOnDriver("start transaction"); |
| List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| runStatementOnDriver("commit"); |
| Assert.assertEquals("Didn't rollback as expected", 0, rs1.size()); |
| } |
| @Test |
| public void testExplicitRollback() throws Exception { |
| runStatementOnDriver("START TRANSACTION"); |
| runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); |
| runStatementOnDriver("ROLLBACK"); |
| List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Rollback didn't rollback", 0, rs.size()); |
| } |
| |
| @Test |
| public void testMultipleInserts() throws Exception { |
| runStatementOnDriver("START TRANSACTION"); |
| int[][] rows1 = {{1,2},{3,4}}; |
| runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1)); |
| int[][] rows2 = {{5,6},{7,8}}; |
| runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2)); |
| List<String> allData = stringifyValues(rows1); |
| allData.addAll(stringifyValues(rows2)); |
| List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Content didn't match before commit rs", allData, rs); |
| runStatementOnDriver("commit"); |
| dumpTableData(Table.ACIDTBL, 1, 0); |
| dumpTableData(Table.ACIDTBL, 1, 1); |
| List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Content didn't match after commit rs1", allData, rs1); |
| } |
| |
| @Test |
| public void testDeleteOfMultipleInserts() throws Exception { |
| runStatementOnDriver("START TRANSACTION"); |
| int[][] rows1 = {{1,2},{3,4}}; |
| runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1)); |
| int[][] rows2 = {{5,6},{7,8}}; |
| runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2)); |
| runStatementOnDriver("commit"); |
| runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 2"); |
| runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 8"); |
| List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| int[][] remain = {{3,4},{5,6}}; |
| Assert.assertEquals("Content didn't match after delete ", stringifyValues(remain), rs2); |
| } |
| |
| @Test |
| public void testDelete() throws Exception { |
| int[][] rows1 = {{1,2},{3,4}}; |
| runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1)); |
| List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0); |
| runStatementOnDriver("START TRANSACTION"); |
| runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 4"); |
| int[][] updatedData2 = {{1,2}}; |
| List<String> rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs3); |
| runStatementOnDriver("commit"); |
| List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData2), rs4); |
| } |
| |
| @Test |
| public void testUpdateOfInserts() throws Exception { |
| int[][] rows1 = {{1,2},{3,4}}; |
| runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1)); |
| List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0); |
| runStatementOnDriver("START TRANSACTION"); |
| int[][] rows2 = {{5,6},{7,8}}; |
| runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2)); |
| List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| List<String> allData = stringifyValues(rows1); |
| allData.addAll(stringifyValues(rows2)); |
| Assert.assertEquals("Content didn't match rs1", allData, rs1); |
| runStatementOnDriver("update " + Table.ACIDTBL + " set b = 1 where b != 1"); |
| int[][] updatedData = {{1,1},{3,1},{5,1},{7,1}}; |
| List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Wrong data after update", stringifyValues(updatedData), rs2); |
| runStatementOnDriver("commit"); |
| List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData), rs4); |
| } |
| @Test |
| public void testUpdateDeleteOfInserts() throws Exception { |
| int[][] rows1 = {{1,2},{3,4}}; |
| runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1)); |
| List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0); |
| runStatementOnDriver("START TRANSACTION"); |
| int[][] rows2 = {{5,6},{7,8}}; |
| runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2)); |
| List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| List<String> allData = stringifyValues(rows1); |
| allData.addAll(stringifyValues(rows2)); |
| Assert.assertEquals("Content didn't match rs1", allData, rs1); |
| runStatementOnDriver("update " + Table.ACIDTBL + " set b = 1 where b != 1"); |
| int[][] updatedData = {{1,1},{3,1},{5,1},{7,1}}; |
| List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Wrong data after update", stringifyValues(updatedData), rs2); |
| runStatementOnDriver("delete from " + Table.ACIDTBL + " where a = 7 and b = 1"); |
| dumpTableData(Table.ACIDTBL, 1, 0); |
| dumpTableData(Table.ACIDTBL, 2, 0); |
| dumpTableData(Table.ACIDTBL, 2, 2); |
| dumpTableData(Table.ACIDTBL, 2, 4); |
| int[][] updatedData2 = {{1,1},{3,1},{5,1}}; |
| List<String> rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs3); |
| runStatementOnDriver("commit"); |
| List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData2), rs4); |
| } |
| @Test |
| public void testMultipleDelete() throws Exception { |
| int[][] rows1 = {{1,2},{3,4},{5,6},{7,8}}; |
| runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1)); |
| List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0); |
| runStatementOnDriver("START TRANSACTION"); |
| runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 8"); |
| int[][] updatedData2 = {{1,2},{3,4},{5,6}}; |
| List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs2); |
| runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 4"); |
| int[][] updatedData3 = {{1, 2}, {5, 6}}; |
| List<String> rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Wrong data after delete2", stringifyValues(updatedData3), rs3); |
| runStatementOnDriver("update " + Table.ACIDTBL + " set b=3"); |
| dumpTableData(Table.ACIDTBL, 1, 0); |
| //nothing actually hashes to bucket0, so update/delete deltas don't have it |
| dumpTableData(Table.ACIDTBL, 2, 0); |
| dumpTableData(Table.ACIDTBL, 2, 2); |
| dumpTableData(Table.ACIDTBL, 2, 4); |
| List<String> rs5 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| int [][] updatedData4 = {{1,3},{5,3}}; |
| Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData4), rs5); |
| runStatementOnDriver("commit"); |
| List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData4), rs4); |
| } |
| @Test |
| public void testDeleteIn() throws Exception { |
| runStatementOnDriver("delete from " + Table.ACIDTBL + " where a IN (SELECT A.a from " + |
| Table.ACIDTBL + " A)"); |
| int[][] tableData = {{1,2},{3,2},{5,2},{1,3},{3,3},{5,3}}; |
| runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData)); |
| runStatementOnDriver("insert into " + Table.ACIDTBL2 + "(a,b,c) values(1,7,17),(3,7,17)"); |
| // runStatementOnDriver("select b from " + Table.ACIDTBL + " where a in (select b from " + Table.NONACIDORCTBL + ")"); |
| runStatementOnDriver("delete from " + Table.ACIDTBL + " where a in(select a from " + Table.ACIDTBL2 + ")"); |
| // runStatementOnDriver("delete from " + Table.ACIDTBL + " where a in(select a from " + Table.NONACIDORCTBL + ")"); |
| runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) select a,b from " + Table.ACIDTBL2); |
| List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| int[][] updatedData = {{1,7},{3,7},{5,2},{5,3}}; |
| Assert.assertEquals("Bulk update failed", stringifyValues(updatedData), rs); |
| } |
| @Test |
| public void testTimeOutReaper() throws Exception { |
| runStatementOnDriver("start transaction"); |
| runStatementOnDriver("delete from " + Table.ACIDTBL + " where a = 5"); |
| //make sure currently running txn is considered aborted by housekeeper |
| hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 2, TimeUnit.MILLISECONDS); |
| MetastoreTaskThread houseKeeperService = new AcidHouseKeeperService(); |
| houseKeeperService.setConf(hiveConf); |
| //this will abort the txn |
| houseKeeperService.run(); |
| //this should fail because txn aborted due to timeout |
| CommandProcessorException e = runStatementOnDriverNegative("delete from " + Table.ACIDTBL + " where a = 5"); |
| Assert.assertTrue("Actual: " + e.getMessage(), |
| e.getMessage().contains("Transaction manager has aborted the transaction txnid:1")); |
| |
| //now test that we don't timeout locks we should not |
| //heartbeater should be running in the background every 1/2 second |
| hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS); |
| // Have to reset the conf when we change it so that the change takes affect |
| houseKeeperService.setConf(hiveConf); |
| runStatementOnDriver("start transaction"); |
| runStatementOnDriver("select count(*) from " + Table.ACIDTBL + " where a = 17"); |
| pause(750); |
| |
| TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); |
| |
| //since there is txn open, we are heartbeating the txn not individual locks |
| GetOpenTxnsInfoResponse txnsInfoResponse = txnHandler.getOpenTxnsInfo(); |
| Assert.assertEquals(2, txnsInfoResponse.getOpen_txns().size()); |
| TxnInfo txnInfo = null; |
| for(TxnInfo ti : txnsInfoResponse.getOpen_txns()) { |
| if(ti.getState() == TxnState.OPEN) { |
| txnInfo = ti; |
| break; |
| } |
| } |
| Assert.assertNotNull(txnInfo); |
| Assert.assertEquals(16, txnInfo.getId()); |
| Assert.assertEquals(TxnState.OPEN, txnInfo.getState()); |
| String s = TestTxnDbUtil |
| .queryToString(hiveConf, "select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false); |
| String[] vals = s.split("\\s+"); |
| Assert.assertEquals("Didn't get expected timestamps", 2, vals.length); |
| long lastHeartbeat = Long.parseLong(vals[1]); |
| //these 2 values are equal when TXN entry is made. Should never be equal after 1st heartbeat, which we |
| //expect to have happened by now since HIVE_TXN_TIMEOUT=1sec |
| Assert.assertNotEquals("Didn't see heartbeat happen", Long.parseLong(vals[0]), lastHeartbeat); |
| |
| ShowLocksResponse slr = txnHandler.showLocks(new ShowLocksRequest()); |
| TestTxnDbUtil.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks()); |
| pause(750); |
| houseKeeperService.run(); |
| pause(750); |
| slr = txnHandler.showLocks(new ShowLocksRequest()); |
| Assert.assertEquals("Unexpected lock count: " + slr, 1, slr.getLocks().size()); |
| TestTxnDbUtil.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks()); |
| |
| pause(750); |
| houseKeeperService.run(); |
| slr = txnHandler.showLocks(new ShowLocksRequest()); |
| Assert.assertEquals("Unexpected lock count: " + slr, 1, slr.getLocks().size()); |
| TestTxnDbUtil.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks()); |
| |
| //should've done several heartbeats |
| s = TestTxnDbUtil.queryToString(hiveConf, "select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false); |
| vals = s.split("\\s+"); |
| Assert.assertEquals("Didn't get expected timestamps", 2, vals.length); |
| Assert.assertTrue("Heartbeat didn't progress: (old,new) (" + lastHeartbeat + "," + vals[1]+ ")", |
| lastHeartbeat < Long.parseLong(vals[1])); |
| |
| runStatementOnDriver("rollback"); |
| slr = txnHandler.showLocks(new ShowLocksRequest()); |
| Assert.assertEquals("Unexpected lock count", 0, slr.getLocks().size()); |
| } |
| private static void pause(int timeMillis) { |
| try { |
| Thread.sleep(timeMillis); |
| } |
| catch (InterruptedException e) { |
| } |
| } |
| |
| @Test |
| public void exchangePartition() throws Exception { |
| runStatementOnDriver("create database ex1"); |
| runStatementOnDriver("create database ex2"); |
| |
| runStatementOnDriver("CREATE TABLE ex1.exchange_part_test1 (f1 string) PARTITIONED BY (ds STRING)"); |
| runStatementOnDriver("CREATE TABLE ex2.exchange_part_test2 (f1 string) PARTITIONED BY (ds STRING)"); |
| runStatementOnDriver("ALTER TABLE ex2.exchange_part_test2 ADD PARTITION (ds='2013-04-05')"); |
| runStatementOnDriver("ALTER TABLE ex1.exchange_part_test1 EXCHANGE PARTITION (ds='2013-04-05') WITH TABLE ex2.exchange_part_test2"); |
| } |
| @Test |
| public void testMergeNegative() throws Exception { |
| CommandProcessorException e = runStatementOnDriverNegative( |
| "MERGE INTO " + Table.ACIDTBL + " target\n" + |
| "USING " + Table.NONACIDORCTBL + " source ON target.a = source.a\n" + |
| "WHEN MATCHED THEN UPDATE set b = 1\n" + |
| "WHEN MATCHED THEN DELETE\n" + |
| "WHEN NOT MATCHED AND a < 1 THEN INSERT VALUES(1,2)"); |
| Assert.assertEquals(ErrorMsg.MERGE_PREDIACTE_REQUIRED, ((HiveException)e.getCause()).getCanonicalErrorMsg()); |
| } |
| @Test |
| public void testMergeNegative2() throws Exception { |
| CommandProcessorException e = runStatementOnDriverNegative( |
| "MERGE INTO "+ Table.ACIDTBL + |
| " target USING " + Table.NONACIDORCTBL + "\n source ON target.pk = source.pk " + |
| "\nWHEN MATCHED THEN UPDATE set b = 1 " + |
| "\nWHEN MATCHED THEN UPDATE set b=a"); |
| Assert.assertEquals(ErrorMsg.MERGE_TOO_MANY_UPDATE, ((HiveException)e.getCause()).getCanonicalErrorMsg()); |
| } |
| |
| /** |
| * `1` means 1 is a column name and '1' means 1 is a string literal |
| * HiveConf.HIVE_QUOTEDID_SUPPORT |
| * HiveConf.HIVE_SUPPORT_SPECICAL_CHARACTERS_IN_TABLE_NAMES |
| * {@link TestTxnCommands#testMergeType2SCD01()} |
| */ |
| @Test |
| public void testQuotedIdentifier() throws Exception { |
| String target = "`aci/d_u/ami`"; |
| String src = "`src/name`"; |
| runStatementOnDriver("drop table if exists " + target); |
| runStatementOnDriver("drop table if exists " + src); |
| runStatementOnDriver("create table " + target + "(i int," + |
| "`d?*de e` decimal(5,2)," + |
| "vc varchar(128)) clustered by (i) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); |
| runStatementOnDriver("create table " + src + "(gh int, j decimal(5,2), k varchar(128))"); |
| runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=gh " + |
| "\nwhen matched and i > 5 then delete " + |
| "\nwhen matched then update set vc='blah' " + |
| "\nwhen not matched then insert values(1,2.1,'baz')"); |
| runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=gh " + |
| "\nwhen matched and i > 5 then delete " + |
| "\nwhen matched then update set vc='blah', `d?*de e` = current_timestamp() " + |
| "\nwhen not matched then insert values(1,2.1, concat('baz', current_timestamp()))"); |
| runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=gh " + |
| "\nwhen matched and i > 5 then delete " + |
| "\nwhen matched then update set vc='blah' " + |
| "\nwhen not matched then insert values(1,2.1,'a\\b')"); |
| runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=gh " + |
| "\nwhen matched and i > 5 then delete " + |
| "\nwhen matched then update set vc='∆∋'" + |
| "\nwhen not matched then insert values(`a/b`.gh,`a/b`.j,'c\\t')"); |
| } |
| @Test |
| public void testQuotedIdentifier2() throws Exception { |
| String target = "`aci/d_u/ami`"; |
| String src = "`src/name`"; |
| runStatementOnDriver("drop table if exists " + target); |
| runStatementOnDriver("drop table if exists " + src); |
| runStatementOnDriver("create table " + target + "(i int," + |
| "`d?*de e` decimal(5,2)," + |
| "vc varchar(128)) clustered by (i) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); |
| runStatementOnDriver("create table " + src + "(`g/h` int, j decimal(5,2), k varchar(128))"); |
| runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=`g/h`" + |
| "\nwhen matched and `g/h` > 5 then delete " + |
| "\nwhen matched and `g/h` < 0 then update set vc='∆∋', `d?*de e` = `d?*de e` * j + 1" + |
| "\nwhen not matched and `d?*de e` <> 0 then insert values(`a/b`.`g/h`,`a/b`.j,`a/b`.k)"); |
| runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=`g/h`" + |
| "\nwhen matched and `g/h` > 5 then delete" + |
| "\n when matched and `g/h` < 0 then update set vc='∆∋' , `d?*de e` = `d?*de e` * j + 1 " + |
| "\n when not matched and `d?*de e` <> 0 then insert values(`a/b`.`g/h`,`a/b`.j,`a/b`.k)"); |
| } |
| /** |
| * https://www.linkedin.com/pulse/how-load-slowly-changing-dimension-type-2-using-oracle-padhy |
| * also test QuotedIdentifier inside source expression |
| * {@link TestTxnCommands#testQuotedIdentifier()} |
| * {@link TestTxnCommands#testQuotedIdentifier2()} |
| */ |
| @Test |
| public void testMergeType2SCD01() throws Exception { |
| runStatementOnDriver("drop table if exists target"); |
| runStatementOnDriver("drop table if exists source"); |
| runStatementOnDriver("drop table if exists splitTable"); |
| |
| runStatementOnDriver("create table splitTable(op int)"); |
| runStatementOnDriver("insert into splitTable values (0),(1)"); |
| runStatementOnDriver("create table source (key int, data int)"); |
| runStatementOnDriver("create table target (key int, data int, cur int) clustered by (key) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); |
| int[][] targetVals = {{1, 5, 1}, {2, 6, 1}, {1, 18, 0}}; |
| runStatementOnDriver("insert into target " + makeValuesClause(targetVals)); |
| int[][] sourceVals = {{1, 7}, {3, 8}}; |
| runStatementOnDriver("insert into source " + makeValuesClause(sourceVals)); |
| //augment source with a col which has 1 if it will cause an update in target, 0 otherwise |
| String curMatch = "select s.*, case when t.cur is null then 0 else 1 end m from source s left outer join (select * from target where target.cur=1) t on s.key=t.key"; |
| //split each row (duplicate) which will cause an update into 2 rows and augment with 'op' col which has 0 to insert, 1 to update |
| String teeCurMatch = "select curMatch.*, case when splitTable.op is null or splitTable.op = 0 then 0 else 1 end `o/p\\n` from (" + curMatch + ") curMatch left outer join splitTable on curMatch.m=1"; |
| if(false) { |
| //this is just for debug |
| List<String> r1 = runStatementOnDriver(curMatch); |
| List<String> r2 = runStatementOnDriver(teeCurMatch); |
| } |
| String stmt = "merge into target t using (" + teeCurMatch + ") s on t.key=s.key and t.cur=1 and s.`o/p\\n`=1 " + |
| "when matched then update set cur=0 " + |
| "when not matched then insert values(s.key,s.data,1)"; |
| //to allow cross join from 'teeCurMatch' |
| hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false); |
| runStatementOnDriver(stmt); |
| int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}}; |
| List<String> r = runStatementOnDriver("select * from target order by key,data,cur"); |
| Assert.assertEquals(stringifyValues(resultVals), r); |
| } |
| /** |
| * https://www.linkedin.com/pulse/how-load-slowly-changing-dimension-type-2-using-oracle-padhy |
| * Same as testMergeType2SCD01 but with a more intuitive "source" expression |
| */ |
| @Test |
| public void testMergeType2SCD02() throws Exception { |
| runStatementOnDriver("drop table if exists target"); |
| runStatementOnDriver("drop table if exists source"); |
| runStatementOnDriver("create table source (key int, data int)"); |
| runStatementOnDriver("create table target (key int, data int, cur int) clustered by (key) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); |
| int[][] targetVals = {{1, 5, 1}, {2, 6, 1}, {1, 18, 0}}; |
| runStatementOnDriver("insert into target " + makeValuesClause(targetVals)); |
| int[][] sourceVals = {{1, 7}, {3, 8}}; |
| runStatementOnDriver("insert into source " + makeValuesClause(sourceVals)); |
| |
| String baseSrc = "select source.*, 0 c from source " + |
| "union all " + |
| "select source.*, 1 c from source " + |
| "inner join target " + |
| "on source.key=target.key where target.cur=1"; |
| if(false) { |
| //this is just for debug |
| List<String> r1 = runStatementOnDriver(baseSrc); |
| List<String> r2 = runStatementOnDriver( |
| "select t.*, s.* from target t right outer join (" + baseSrc + ") s " + |
| "\non t.key=s.key and t.cur=s.c and t.cur=1"); |
| } |
| String stmt = "merge into target t using " + |
| "(" + baseSrc + ") s " + |
| "on t.key=s.key and t.cur=s.c and t.cur=1 " + |
| "when matched then update set cur=0 " + |
| "when not matched then insert values(s.key,s.data,1)"; |
| |
| runStatementOnDriver(stmt); |
| int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}}; |
| List<String> r = runStatementOnDriver("select * from target order by key,data,cur"); |
| Assert.assertEquals(stringifyValues(resultVals), r); |
| } |
| |
| @Test |
| public void testMergeOnTezEdges() throws Exception { |
| String query = "merge into " + Table.ACIDTBL + |
| " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " + |
| "WHEN MATCHED AND s.a > 8 THEN DELETE " + |
| "WHEN MATCHED THEN UPDATE SET b = 7 " + |
| "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) "; |
| d.destroy(); |
| HiveConf hc = new HiveConf(hiveConf); |
| hc.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez"); |
| hc.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false); |
| d = new Driver(hc); |
| d.setMaxRows(10000); |
| |
| List<String> explain = runStatementOnDriver("explain " + query); |
| StringBuilder sb = new StringBuilder(); |
| for(String s : explain) { |
| sb.append(s).append('\n'); |
| } |
| LOG.info("Explain1: " + sb); |
| /* |
| Edges: |
| Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 8 (SIMPLE_EDGE) |
| Reducer 3 <- Reducer 2 (SIMPLE_EDGE) |
| Reducer 4 <- Reducer 2 (SIMPLE_EDGE) |
| Reducer 5 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) |
| Reducer 6 <- Reducer 2 (SIMPLE_EDGE) |
| Reducer 7 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) |
| */ |
| for(int i = 0; i < explain.size(); i++) { |
| if(explain.get(i).contains("Edges:")) { |
| Assert.assertTrue("At i+1=" + (i+1) + explain.get(i + 1), |
| explain.get(i + 1).contains("Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 8 (SIMPLE_EDGE)")); |
| Assert.assertTrue("At i+1=" + (i+2) + explain.get(i + 2), |
| explain.get(i + 2).contains("Reducer 3 <- Reducer 2 (SIMPLE_EDGE)")); |
| Assert.assertTrue("At i+1=" + (i+3) + explain.get(i + 3), |
| explain.get(i + 3).contains("Reducer 4 <- Reducer 2 (SIMPLE_EDGE)")); |
| Assert.assertTrue("At i+1=" + (i+4) + explain.get(i + 4), |
| explain.get(i + 4).contains("Reducer 5 <- Reducer 2 (SIMPLE_EDGE)")); |
| Assert.assertTrue("At i+1=" + (i+5) + explain.get(i + 5), |
| explain.get(i + 5).contains("Reducer 6 <- Reducer 2 (SIMPLE_EDGE)")); |
| Assert.assertTrue("At i+1=" + (i+6) + explain.get(i + 6), |
| explain.get(i + 6).contains("Reducer 7 <- Reducer 2 (SIMPLE_EDGE)")); |
| break; |
| } |
| } |
| } |
| @Test |
| public void testMergeUpdateDelete() throws Exception { |
| int[][] baseValsOdd = {{2,2},{4,44},{5,5},{11,11}}; |
| runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(baseValsOdd)); |
| int[][] vals = {{2,1},{4,3},{5,6},{7,8}}; |
| runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals)); |
| String query = "merge into " + Table.ACIDTBL + |
| " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " + |
| "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " + //updates (2,1) -> (2,0) |
| "WHEN MATCHED and t.a > 3 and t.a < 5 THEN DELETE " +//deletes (4,3) |
| "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) "; //inserts (11,11) |
| runStatementOnDriver(query); |
| |
| List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| int[][] rExpected = {{2,0},{5,6},{7,8},{11,11}}; |
| Assert.assertEquals(stringifyValues(rExpected), r); |
| } |
| @Test |
| public void testMergeUpdateDeleteNoCardCheck() throws Exception { |
| d.destroy(); |
| HiveConf hc = new HiveConf(hiveConf); |
| hc.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, false); |
| d = new Driver(hc); |
| d.setMaxRows(10000); |
| |
| int[][] baseValsOdd = {{2,2},{4,44},{5,5},{11,11}}; |
| runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(baseValsOdd)); |
| int[][] vals = {{2,1},{4,3},{5,6},{7,8}}; |
| runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals)); |
| String query = "merge into " + Table.ACIDTBL + |
| " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " + |
| "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " + |
| "WHEN MATCHED and t.a > 3 and t.a < 5 THEN DELETE "; |
| runStatementOnDriver(query); |
| |
| List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| int[][] rExpected = {{2,0},{5,6},{7,8}}; |
| Assert.assertEquals(stringifyValues(rExpected), r); |
| } |
| @Test |
| public void testMergeDeleteUpdate() throws Exception { |
| int[][] sourceVals = {{2,2},{4,44},{5,5},{11,11}}; |
| runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(sourceVals)); |
| int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}}; |
| runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(targetVals)); |
| String query = "merge into " + Table.ACIDTBL + |
| " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " + |
| "WHEN MATCHED and s.a < 5 THEN DELETE " + |
| "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " + |
| "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) "; |
| runStatementOnDriver(query); |
| |
| List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); |
| int[][] rExpected = {{5,6},{7,8},{11,11}}; |
| Assert.assertEquals(stringifyValues(rExpected), r); |
| } |
| |
| /** |
| * see https://issues.apache.org/jira/browse/HIVE-14949 for details |
| * @throws Exception |
| */ |
| @Test |
| public void testMergeCardinalityViolation() throws Exception { |
| int[][] sourceVals = {{2,2},{2,44},{5,5},{11,11}}; |
| runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(sourceVals)); |
| int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}}; |
| runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(targetVals)); |
| String query = "merge into " + Table.ACIDTBL + |
| " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " + |
| "WHEN MATCHED and s.a < 5 THEN DELETE " + |
| "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " + |
| "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) "; |
| runStatementOnDriverNegative(query); |
| runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1'),(4,4,'p2')"); |
| query = "merge into " + Table.ACIDTBLPART + |
| " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " + |
| "WHEN MATCHED and s.a < 5 THEN DELETE " + |
| "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " + |
| "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b, 'p1') "; |
| runStatementOnDriverNegative(query); |
| } |
| @Test |
| public void testSetClauseFakeColumn() throws Exception { |
| CommandProcessorException e1 = runStatementOnDriverNegative( |
| "MERGE INTO "+ Table.ACIDTBL + " target\n" + |
| "USING " + Table.NONACIDORCTBL + "\n" + |
| " source ON target.a = source.a\n" + |
| "WHEN MATCHED THEN UPDATE set t = 1"); |
| Assert.assertEquals(ErrorMsg.INVALID_TARGET_COLUMN_IN_SET_CLAUSE, |
| ((HiveException)e1.getCause()).getCanonicalErrorMsg()); |
| |
| CommandProcessorException e2 = runStatementOnDriverNegative("update " + Table.ACIDTBL + " set t = 1"); |
| Assert.assertEquals(ErrorMsg.INVALID_TARGET_COLUMN_IN_SET_CLAUSE, |
| ((HiveException)e2.getCause()).getCanonicalErrorMsg()); |
| } |
| |
| @Test |
| public void testBadOnClause() throws Exception { |
| CommandProcessorException e = |
| runStatementOnDriverNegative( |
| "merge into " + Table.ACIDTBL + " trgt\n" + |
| "using (select *\n" + |
| " from " + Table.NONACIDORCTBL + " src) sub on sub.a = target.a\n" + |
| "when not matched then insert values (sub.a,sub.b)"); |
| Assert.assertTrue("Error didn't match: " + e, e.getMessage().contains( |
| "No columns from target table 'trgt' found in ON clause '`sub`.`a` = `target`.`a`' of MERGE statement.")); |
| } |
| |
| /** |
| * Writing UTs that need multiple threads is challenging since Derby chokes on concurrent access. |
| * This tests that "AND WAIT" actually blocks and responds to interrupt |
| * @throws Exception |
| */ |
| @Test |
| public void testCompactionBlocking() throws Exception { |
| Timer cancelCompact = new Timer("CancelCompactionTimer", false); |
| final Thread threadToInterrupt= Thread.currentThread(); |
| cancelCompact.schedule(new TimerTask() { |
| @Override |
| public void run() { |
| threadToInterrupt.interrupt(); |
| } |
| }, 5000); |
| long start = System.currentTimeMillis(); |
| runStatementOnDriver("alter table " + Table.ACIDTBL + " compact 'major' AND WAIT"); |
| //no Worker so it stays in initiated state |
| //w/o AND WAIT the above alter table retunrs almost immediately, so the test here to check that |
| //> 2 seconds pass, i.e. that the command in Driver actually blocks before cancel is fired |
| Assert.assertTrue(System.currentTimeMillis() > start + 2); |
| } |
| |
| @Test |
| public void testMergeCase() throws Exception { |
| runStatementOnDriver("create table merge_test (c1 integer, c2 integer, c3 integer) CLUSTERED BY (c1) into 2 buckets stored as orc tblproperties(\"transactional\"=\"true\")"); |
| runStatementOnDriver("create table if not exists e011_02 (c1 float, c2 double, c3 float)"); |
| runStatementOnDriver("merge into merge_test using e011_02 on (merge_test.c1 = e011_02.c1) when not matched then insert values (case when e011_02.c1 > 0 then e011_02.c1 + 1 else e011_02.c1 end, e011_02.c2 + e011_02.c3, coalesce(e011_02.c3, 1))"); |
| } |
| /** |
| * HIVE-16177 |
| * See also {@link TestTxnCommands2#testNonAcidToAcidConversion02()} |
| */ |
| @Test |
| public void testNonAcidToAcidConversion01() throws Exception { |
| //create 1 row in a file 000001_0 (and an empty 000000_0) |
| runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); |
| //create 1 row in a file 000000_0_copy1 and 1 row in a file 000001_0_copy1 |
| runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,12),(1,5)"); |
| |
| //convert the table to Acid |
| runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); |
| //create a delta directory |
| runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,17)"); |
| |
| boolean isVectorized = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED); |
| String query = "select ROW__ID, a, b" + (isVectorized ? " from " : ", INPUT__FILE__NAME from ") + Table.NONACIDORCTBL + " order by ROW__ID"; |
| String[][] expected = new String[][] { |
| {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "nonacidorctbl/000001_0"}, |
| {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t0\t12", "nonacidorctbl/000001_0_copy_1"}, |
| {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "nonacidorctbl/000001_0_copy_1"}, |
| {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t1\t17", "nonacidorctbl/delta_10000001_10000001_0000/bucket_00001_0"} |
| }; |
| checkResult(expected, query, isVectorized, "before compact", LOG); |
| |
| Assert.assertEquals(536870912, |
| BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(0))); |
| Assert.assertEquals(536936448, |
| BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(1))); |
| |
| //run Compaction |
| runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " compact 'major'"); |
| runWorker(hiveConf); |
| |
| query = "select ROW__ID, a, b" + (isVectorized ? "" : ", INPUT__FILE__NAME") + " from " |
| + Table.NONACIDORCTBL + " order by ROW__ID"; |
| String[][] expected2 = new String[][] { |
| {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "nonacidorctbl/base_10000001_v0000021/bucket_00001"}, |
| {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t0\t12", "nonacidorctbl/base_10000001_v0000021/bucket_00001"}, |
| {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "nonacidorctbl/base_10000001_v0000021/bucket_00001"}, |
| {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t1\t17", "nonacidorctbl/base_10000001_v0000021/bucket_00001"} |
| }; |
| checkResult(expected2, query, isVectorized, "after major compact", LOG); |
| //make sure they are the same before and after compaction |
| } |
| //@Ignore("see bucket_num_reducers_acid.q") |
| @Test |
| public void testMoreBucketsThanReducers() throws Exception { |
| //see bucket_num_reducers.q bucket_num_reducers2.q |
| // todo: try using set VerifyNumReducersHook.num.reducers=10; |
| d.destroy(); |
| HiveConf hc = new HiveConf(hiveConf); |
| hc.setIntVar(HiveConf.ConfVars.MAX_REDUCERS, 1); |
| //this is used in multiple places, SemanticAnalyzer.getBucketingSortingDest() among others |
| hc.setIntVar(HiveConf.ConfVars.HADOOP_NUM_REDUCERS, 1); |
| hc.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false); |
| d = new Driver(hc); |
| d.setMaxRows(10000); |
| runStatementOnDriver("insert into " + Table.ACIDTBL + " values(1,1)"); //txn X write to bucket1 |
| runStatementOnDriver("insert into " + Table.ACIDTBL + " values(0,0),(3,3)"); // txn X + 1 write to bucket0 + bucket1 |
| runStatementOnDriver("update " + Table.ACIDTBL + " set b = -1"); |
| List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBL + " order by a, b"); |
| int[][] expected = {{0, -1}, {1, -1}, {3, -1}}; |
| Assert.assertEquals(stringifyValues(expected), r); |
| } |
| @Ignore("Moved to Tez") |
| @Test |
| public void testMoreBucketsThanReducers2() throws Exception { |
| //todo: try using set VerifyNumReducersHook.num.reducers=10; |
| //see bucket_num_reducers.q bucket_num_reducers2.q |
| d.destroy(); |
| HiveConf hc = new HiveConf(hiveConf); |
| hc.setIntVar(HiveConf.ConfVars.MAX_REDUCERS, 2); |
| //this is used in multiple places, SemanticAnalyzer.getBucketingSortingDest() among others |
| hc.setIntVar(HiveConf.ConfVars.HADOOP_NUM_REDUCERS, 2); |
| d = new Driver(hc); |
| d.setMaxRows(10000); |
| runStatementOnDriver("create table fourbuckets (a int, b int) clustered by (a) into 4 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); |
| //below value for a is bucket id, for b - txn id (logically) |
| runStatementOnDriver("insert into fourbuckets values(0,1),(1,1)"); //txn X write to b0 + b1 |
| runStatementOnDriver("insert into fourbuckets values(2,2),(3,2)"); // txn X + 1 write to b2 + b3 |
| runStatementOnDriver("insert into fourbuckets values(0,3),(1,3)"); //txn X + 2 write to b0 + b1 |
| runStatementOnDriver("insert into fourbuckets values(2,4),(3,4)"); //txn X + 3 write to b2 + b3 |
| //so with 2 FileSinks and 4 buckets, FS1 should see (0,1),(2,2),(0,3)(2,4) since data is sorted by ROW__ID where tnxid is the first component |
| //FS2 should see (1,1),(3,2),(1,3),(3,4) |
| |
| runStatementOnDriver("update fourbuckets set b = -1"); |
| List<String> r = runStatementOnDriver("select * from fourbuckets order by a, b"); |
| int[][] expected = {{0, -1},{0, -1}, {1, -1}, {1, -1}, {2, -1}, {2, -1}, {3, -1}, {3, -1}}; |
| Assert.assertEquals(stringifyValues(expected), r); |
| } |
| @Test |
| public void testVersioningVersionFileEnabled() throws Exception { |
| acidVersionTest(true); |
| } |
| |
| @Test |
| public void testVersioningVersionFileDisabled() throws Exception { |
| acidVersionTest(false); |
| } |
| |
| private void acidVersionTest(boolean enableVersionFile) throws Exception { |
| boolean originalEnableVersionFile = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE); |
| hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE, enableVersionFile); |
| |
| hiveConf.set(MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID.getVarname(), "true"); |
| // Need to close the thread local Hive object so that configuration change is reflected to HMS. |
| Hive.closeCurrent(); |
| runStatementOnDriver("drop table if exists T"); |
| runStatementOnDriver("create table T (a int, b int) stored as orc"); |
| int[][] data = {{1, 2}}; |
| //create 1 delta file bucket_00000 |
| runStatementOnDriver("insert into T" + makeValuesClause(data)); |
| runStatementOnDriver("update T set a=3 where b=2"); |
| |
| FileSystem fs = FileSystem.get(hiveConf); |
| RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path(getWarehouseDir(), "t"), true); |
| CompactorTestUtilities.checkAcidVersion(files, fs, enableVersionFile, |
| new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX }); |
| |
| runStatementOnDriver("alter table T compact 'minor'"); |
| runWorker(hiveConf); |
| |
| // Check status of compaction job |
| TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); |
| ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); |
| Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); |
| Assert.assertEquals("Unexpected 0 compaction state", |
| TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); |
| Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local")); |
| Assert.assertTrue(resp.getCompacts().get(0).getType().equals(CompactionType.MINOR)); |
| |
| // Check the files after minor compaction |
| files = fs.listFiles(new Path(getWarehouseDir(), "t"), true); |
| CompactorTestUtilities.checkAcidVersion(files, fs, enableVersionFile, |
| new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX }); |
| |
| runStatementOnDriver("insert into T" + makeValuesClause(data)); |
| |
| runStatementOnDriver("alter table T compact 'major'"); |
| runWorker(hiveConf); |
| |
| // Check status of compaction job |
| txnHandler = TxnUtils.getTxnStore(hiveConf); |
| resp = txnHandler.showCompact(new ShowCompactRequest()); |
| Assert.assertEquals("Unexpected number of compactions in history", 2, resp.getCompactsSize()); |
| Assert.assertEquals("Unexpected 1 compaction state", |
| TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(1).getState()); |
| Assert.assertTrue(resp.getCompacts().get(1).getHadoopJobId().startsWith("job_local")); |
| |
| // Check the files after major compaction |
| files = fs.listFiles(new Path(getWarehouseDir(), "t"), true); |
| CompactorTestUtilities.checkAcidVersion(files, fs, enableVersionFile, |
| new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX, AcidUtils.BASE_PREFIX }); |
| |
| hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE, originalEnableVersionFile); |
| } |
| |
| @Test |
| public void testTruncateWithBase() throws Exception{ |
| HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_TRUNCATE_USE_BASE, true); |
| |
| runStatementOnDriver("insert into " + Table.ACIDTBL + " values(1,2),(3,4)"); |
| runStatementOnDriver("truncate table " + Table.ACIDTBL); |
| |
| FileSystem fs = FileSystem.get(hiveConf); |
| FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBL.toString().toLowerCase()), |
| AcidUtils.baseFileFilter); |
| if (1 != stat.length) { |
| Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat)); |
| } |
| String name = stat[0].getPath().getName(); |
| Assert.assertEquals("base_0000002", name); |
| |
| List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBL); |
| Assert.assertEquals(0, r.size()); |
| } |
| |
| @Test |
| public void testTruncateWithBaseAllPartition() throws Exception{ |
| HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_TRUNCATE_USE_BASE, true); |
| |
| runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p='a') values(1,2),(3,4)"); |
| runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p='b') values(1,2),(3,4)"); |
| runStatementOnDriver("truncate table " + Table.ACIDTBLPART); |
| |
| FileSystem fs = FileSystem.get(hiveConf); |
| FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=a"), |
| AcidUtils.baseFileFilter); |
| if (1 != stat.length) { |
| Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat)); |
| } |
| String name = stat[0].getPath().getName(); |
| Assert.assertEquals("base_0000003", name); |
| |
| List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBLPART); |
| Assert.assertEquals(0, r.size()); |
| } |
| |
| @Test |
| public void testTruncateWithBaseOnePartition() throws Exception{ |
| HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_TRUNCATE_USE_BASE, true); |
| |
| runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p='a') values(1,2),(3,4)"); |
| runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p='b') values(5,5),(4,4)"); |
| runStatementOnDriver("truncate table " + Table.ACIDTBLPART + " partition(p='b')"); |
| |
| FileSystem fs = FileSystem.get(hiveConf); |
| FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=b"), |
| AcidUtils.baseFileFilter); |
| if (1 != stat.length) { |
| Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat)); |
| } |
| String name = stat[0].getPath().getName(); |
| Assert.assertEquals("base_0000003", name); |
| stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=a"), |
| AcidUtils.deltaFileFilter); |
| if (1 != stat.length) { |
| Assert.fail("Expecting 1 delta and found " + stat.length + " files " + Arrays.toString(stat)); |
| } |
| |
| List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBLPART); |
| Assert.assertEquals(2, r.size()); |
| } |
| |
| @Test |
| public void testDropWithBaseAndRecreateOnePartition() throws Exception { |
| dropWithBaseOnePartition(true); |
| } |
| @Test |
| public void testDropWithBaseOnePartition() throws Exception { |
| dropWithBaseOnePartition(false); |
| } |
| |
| private void dropWithBaseOnePartition(boolean reCreate) throws Exception { |
| runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition (p='a') values (1,2),(3,4)"); |
| runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition (p='b') values (5,5),(4,4)"); |
| |
| HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_DROP_PARTITION_USE_BASE, true); |
| runStatementOnDriver("alter table " + Table.ACIDTBLPART + " drop partition (p='b')"); |
| |
| FileSystem fs = FileSystem.get(hiveConf); |
| FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=b"), |
| AcidUtils.baseFileFilter); |
| if (1 != stat.length) { |
| Assert.fail("Expecting 1 base and found " + Arrays.toString(stat)); |
| } |
| String name = stat[0].getPath().getName(); |
| Assert.assertEquals("base_0000003", name); |
| |
| List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBLPART); |
| Assert.assertEquals(2, r.size()); |
| Assert.assertTrue(isEqualCollection(r, asList("1\t2\ta", "3\t4\ta"))); |
| |
| TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); |
| ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); |
| |
| Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); |
| Assert.assertTrue(resp.getCompacts().stream().anyMatch( |
| ci -> TxnStore.CLEANING_RESPONSE.equals(ci.getState()) && "p=b".equals(ci.getPartitionname()))); |
| if (reCreate) { |
| runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition (p='b') values (3,3)"); |
| } |
| runCleaner(hiveConf); |
| |
| stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase()), |
| path -> path.getName().equals("p=b")); |
| if ((reCreate ? 1 : 0) != stat.length) { |
| Assert.fail("Partition data was " + (reCreate ? "" : "not") + " removed from FS"); |
| } |
| if (reCreate) { |
| r = runStatementOnDriver("select * from " + Table.ACIDTBLPART); |
| Assert.assertEquals(3, r.size()); |
| Assert.assertTrue(isEqualCollection(r, asList("1\t2\ta", "3\t4\ta", "3\t3\tb"))); |
| } |
| } |
| |
| @Test |
| public void testDropWithBaseMultiplePartitions() throws Exception { |
| runStatementOnDriver("insert into " + Table.ACIDTBLNESTEDPART + " partition (p1='a', p2='a', p3='a') values (1,1),(2,2)"); |
| runStatementOnDriver("insert into " + Table.ACIDTBLNESTEDPART + " partition (p1='a', p2='a', p3='b') values (3,3),(4,4)"); |
| runStatementOnDriver("insert into " + Table.ACIDTBLNESTEDPART + " partition (p1='a', p2='b', p3='c') values (7,7),(8,8)"); |
| |
| HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_DROP_PARTITION_USE_BASE, true); |
| runStatementOnDriver("alter table " + Table.ACIDTBLNESTEDPART + " drop partition (p2='a')"); |
| |
| TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); |
| ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); |
| Assert.assertEquals("Unexpected number of compactions in history", 2, resp.getCompactsSize()); |
| |
| FileSystem fs = FileSystem.get(hiveConf); |
| FileStatus[] stat; |
| |
| for (char p : asList('a', 'b')) { |
| String partName = "p1=a/p2=a/p3=" + p; |
| Assert.assertTrue(resp.getCompacts().stream().anyMatch( |
| ci -> TxnStore.CLEANING_RESPONSE.equals(ci.getState()) && partName.equals(ci.getPartitionname()))); |
| |
| stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLNESTEDPART.toString().toLowerCase() + "/" + partName), |
| AcidUtils.baseFileFilter); |
| if (1 != stat.length) { |
| Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat)); |
| } |
| String name = stat[0].getPath().getName(); |
| Assert.assertEquals("base_0000004", name); |
| } |
| stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLNESTEDPART.toString().toLowerCase() + "/p1=a/p2=b/p3=c"), |
| AcidUtils.baseFileFilter); |
| if (0 != stat.length) { |
| Assert.fail("Expecting no base and found " + stat.length + " files " + Arrays.toString(stat)); |
| } |
| |
| List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBLNESTEDPART); |
| Assert.assertEquals(2, r.size()); |
| |
| runCleaner(hiveConf); |
| |
| for (char p : asList('a', 'b')) { |
| stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLNESTEDPART.toString().toLowerCase() + "/p1=a/p2=a"), |
| path -> path.getName().equals("p3=" + p)); |
| if (0 != stat.length) { |
| Assert.fail("Partition data was not removed from FS"); |
| } |
| } |
| } |
| |
| @Test |
| public void testDropDatabaseCascadePerTableNonBlocking() throws Exception { |
| MetastoreConf.setLongVar(hiveConf, MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX, 1); |
| dropDatabaseCascadeNonBlocking(); |
| } |
| |
| @Test |
| public void testDropDatabaseCascadePerDbNonBlocking() throws Exception { |
| dropDatabaseCascadeNonBlocking(); |
| } |
| |
| @Test |
| public void testDropDatabaseCascadePerDbNonBlockingFilterTableNames() throws Exception { |
| DummyMetaStoreFilterHookImpl.blockResults = true; |
| dropDatabaseCascadeNonBlocking(); |
| } |
| |
| private void dropDatabaseCascadeNonBlocking() throws Exception { |
| String database = "mydb"; |
| String tableName = "tab_acid"; |
| |
| runStatementOnDriver("drop database if exists " + database + " cascade"); |
| runStatementOnDriver("create database " + database); |
| |
| // Create transactional table/materialized view with lockless-reads feature disabled |
| runStatementOnDriver("create table " + database + "." + tableName + "1 (a int, b int) " + |
| "partitioned by (ds string) stored as orc TBLPROPERTIES ('transactional'='true')"); |
| runStatementOnDriver("insert into " + database + "." + tableName + "1 partition(ds) values(1,2,'foo'),(3,4,'bar')"); |
| |
| runStatementOnDriver("create materialized view " + database + ".mv_" + tableName + "1 " + |
| "partitioned on (ds) stored as orc TBLPROPERTIES ('transactional'='true')" + |
| "as select a, ds from " + database + "." + tableName + "1 where b > 1"); |
| |
| HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED, true); |
| // Create transactional table/materialized view with lockless-reads feature enabled |
| runStatementOnDriver("create table " + database + "." + tableName + "2 (a int, b int) " + |
| "partitioned by (ds string) stored as orc TBLPROPERTIES ('transactional'='true')"); |
| runStatementOnDriver("insert into " + database + "." + tableName + "2 partition(ds) values(1,2,'foo'),(3,4,'bar')"); |
| |
| runStatementOnDriver("create materialized view " + database + ".mv_" + tableName + "2 " + |
| "partitioned on (ds) stored as orc TBLPROPERTIES ('transactional'='true')" + |
| "as select a, ds from " + database + "." + tableName + "2 where b > 1"); |
| |
| // Create external partition data |
| runStatementOnDriver("drop table if exists Tstage"); |
| runStatementOnDriver("create table Tstage (a int, b int) stored as orc" + |
| " tblproperties('transactional'='false')"); |
| runStatementOnDriver("insert into Tstage values(0,2),(0,4)"); |
| |
| runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'"); |
| runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/2'"); |
| |
| // Create external table |
| runStatementOnDriver("create external table " + database + ".tab_ext (a int, b int) " + |
| "partitioned by (ds string) stored as parquet"); |
| runStatementOnDriver("insert into " + database + ".tab_ext partition(ds) values(1,2,'foo'),(3,4,'bar')"); |
| // Add partition with external location |
| runStatementOnDriver("alter table " + database + ".tab_ext add partition (ds='baz') location '" +getWarehouseDir() + "/1/data'"); |
| |
| // Create managed table |
| runStatementOnDriver("create table " + database + ".tab_nonacid (a int, b int) " + |
| "partitioned by (ds string) stored as parquet"); |
| runStatementOnDriver("insert into " + database + ".tab_nonacid partition(ds) values(1,2,'foo'),(3,4,'bar')"); |
| // Add partition with external location |
| runStatementOnDriver("alter table " + database + ".tab_nonacid add partition (ds='baz') location '" +getWarehouseDir() + "/2/data'"); |
| |
| // Drop database cascade |
| runStatementOnDriver("drop database " + database + " cascade"); |
| |
| FileSystem fs = FileSystem.get(hiveConf); |
| FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), database + ".db"), |
| t -> t.getName().matches("(mv_)?" + tableName + "2" + SOFT_DELETE_TABLE_PATTERN)); |
| if (2 != stat.length) { |
| Assert.fail("Table data was removed from FS"); |
| } |
| stat = fs.listStatus(new Path(getWarehouseDir(), database + ".db")); |
| Assert.assertEquals(2, stat.length); |
| // External table under warehouse external location should be removed |
| stat = fs.listStatus(new Path(getWarehouseDir(), "ext")); |
| Assert.assertEquals(0, stat.length); |
| // External partition for the external table should remain |
| stat = fs.listStatus(new Path(getWarehouseDir(),"1"), |
| t -> t.getName().equals("data")); |
| Assert.assertEquals(1, stat.length); |
| // External partition for managed table should be removed |
| stat = fs.listStatus(new Path(getWarehouseDir(), "2"), |
| t -> t.getName().equals("data")); |
| Assert.assertEquals(0, stat.length); |
| |
| runCleaner(hiveConf); |
| |
| stat = fs.listStatus(new Path(getWarehouseDir(), database + ".db"), |
| t -> t.getName().matches("(mv_)?" + tableName + "2" + SOFT_DELETE_TABLE_PATTERN)); |
| if (stat.length != 0) { |
| Assert.fail("Table data was not removed from FS"); |
| } |
| } |
| |
| @Test |
| public void testDropTableWithSuffix() throws Exception { |
| String tableName = "tab_acid"; |
| runStatementOnDriver("drop table if exists " + tableName); |
| HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, true); |
| |
| runStatementOnDriver("create table " + tableName + "(a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')"); |
| runStatementOnDriver("insert into " + tableName + " values(1,2),(3,4)"); |
| runStatementOnDriver("drop table " + tableName); |
| |
| int count = TestTxnDbUtil.countQueryAgent(hiveConf, |
| "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + tableName + "'"); |
| Assert.assertEquals(1, count); |
| |
| FileSystem fs = FileSystem.get(hiveConf); |
| FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir()), |
| t -> t.getName().matches(tableName + SOFT_DELETE_TABLE_PATTERN)); |
| if (1 != stat.length) { |
| Assert.fail("Table data was removed from FS"); |
| } |
| MetastoreTaskThread houseKeeperService = new AcidHouseKeeperService(); |
| houseKeeperService.setConf(hiveConf); |
| |
| houseKeeperService.run(); |
| count = TestTxnDbUtil.countQueryAgent(hiveConf, |
| "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + tableName + "'"); |
| Assert.assertEquals(0, count); |
| |
| try { |
| runStatementOnDriver("select * from " + tableName); |
| } catch (Exception ex) { |
| Assert.assertTrue(ex.getMessage().contains( |
| ErrorMsg.INVALID_TABLE.getMsg(StringUtils.wrap(tableName, "'")))); |
| } |
| // Check status of compaction job |
| TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); |
| ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); |
| |
| Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); |
| Assert.assertEquals("Unexpected 0 compaction state", |
| TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); |
| |
| runCleaner(hiveConf); |
| |
| FileStatus[] status = fs.listStatus(new Path(getWarehouseDir()), |
| t -> t.getName().matches(tableName + SOFT_DELETE_TABLE_PATTERN)); |
| Assert.assertEquals(0, status.length); |
| } |
| |
| @Test |
| public void testDropTableWithoutSuffix() throws Exception { |
| String tableName = "tab_acid"; |
| runStatementOnDriver("drop table if exists " + tableName); |
| |
| for (boolean enabled : asList(false, true)) { |
| HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, enabled); |
| runStatementOnDriver("create table " + tableName + "(a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')"); |
| runStatementOnDriver("insert into " + tableName + " values(1,2),(3,4)"); |
| |
| HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, !enabled); |
| runStatementOnDriver("drop table " + tableName); |
| |
| int count = TestTxnDbUtil.countQueryAgent(hiveConf, |
| "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + tableName + "'"); |
| Assert.assertEquals(0, count); |
| |
| FileSystem fs = FileSystem.get(hiveConf); |
| FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir()), |
| t -> t.getName().equals(tableName)); |
| Assert.assertEquals(0, stat.length); |
| |
| try { |
| runStatementOnDriver("select * from " + tableName); |
| } catch (Exception ex) { |
| Assert.assertTrue(ex.getMessage().contains( |
| ErrorMsg.INVALID_TABLE.getMsg(StringUtils.wrap(tableName, "'")))); |
| } |
| // Check status of compaction job |
| TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); |
| ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); |
| Assert.assertEquals("Unexpected number of compactions in history", 0, resp.getCompactsSize()); |
| } |
| } |
| |
| @Test |
| public void testDropMaterializedViewWithSuffix() throws Exception { |
| String tableName = "tab_acid"; |
| String mviewName = "mv_" + tableName; |
| runStatementOnDriver("drop materialized view if exists " + mviewName); |
| runStatementOnDriver("drop table if exists " + tableName); |
| HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, true); |
| |
| runStatementOnDriver("create table " + tableName + "(a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')"); |
| runStatementOnDriver("insert into " + tableName + " values(1,2),(3,4)"); |
| runStatementOnDriver("create materialized view " + mviewName + " stored as orc TBLPROPERTIES ('transactional'='true') " + |
| "as select a from tab_acid where b > 1"); |
| runStatementOnDriver("drop materialized view " + mviewName); |
| |
| int count = TestTxnDbUtil.countQueryAgent(hiveConf, |
| "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + mviewName + "'"); |
| Assert.assertEquals(1, count); |
| |
| FileSystem fs = FileSystem.get(hiveConf); |
| FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir()), |
| t -> t.getName().matches(mviewName + SOFT_DELETE_TABLE_PATTERN)); |
| if (1 != stat.length) { |
| Assert.fail("Materialized view data was removed from FS"); |
| } |
| MetastoreTaskThread houseKeeperService = new AcidHouseKeeperService(); |
| houseKeeperService.setConf(hiveConf); |
| |
| houseKeeperService.run(); |
| count = TestTxnDbUtil.countQueryAgent(hiveConf, |
| "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + mviewName + "'"); |
| Assert.assertEquals(0, count); |
| |
| try { |
| runStatementOnDriver("select * from " + mviewName); |
| } catch (Exception ex) { |
| Assert.assertTrue(ex.getMessage().contains( |
| ErrorMsg.INVALID_TABLE.getMsg(StringUtils.wrap(mviewName, "'")))); |
| } |
| // Check status of compaction job |
| TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); |
| ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); |
| |
| Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); |
| Assert.assertEquals("Unexpected 0 compaction state", |
| TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); |
| |
| runCleaner(hiveConf); |
| |
| FileStatus[] status = fs.listStatus(new Path(getWarehouseDir()), |
| t -> t.getName().matches(mviewName + SOFT_DELETE_TABLE_PATTERN)); |
| Assert.assertEquals(0, status.length); |
| } |
| |
| @Test |
| public void testDropMaterializedViewWithoutSuffix() throws Exception { |
| String tableName = "tab_acid"; |
| String mviewName = "mv_" + tableName; |
| runStatementOnDriver("drop materialized view if exists " + mviewName); |
| |
| for (boolean enabled : asList(false, true)) { |
| runStatementOnDriver("drop table if exists " + tableName); |
| HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, enabled); |
| |
| runStatementOnDriver("create table " + tableName + "(a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')"); |
| runStatementOnDriver("insert into " + tableName + " values(1,2),(3,4)"); |
| runStatementOnDriver("create materialized view " + mviewName + " stored as orc TBLPROPERTIES ('transactional'='true') " + |
| "as select a from tab_acid where b > 1"); |
| |
| HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, !enabled); |
| runStatementOnDriver("drop materialized view " + mviewName); |
| |
| int count = TestTxnDbUtil.countQueryAgent(hiveConf, |
| "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + mviewName + "'"); |
| Assert.assertEquals(0, count); |
| |
| FileSystem fs = FileSystem.get(hiveConf); |
| FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir()), |
| t -> t.getName().equals(mviewName)); |
| Assert.assertEquals(0, stat.length); |
| |
| try { |
| runStatementOnDriver("select * from " + mviewName); |
| } catch (Exception ex) { |
| Assert.assertTrue(ex.getMessage().contains( |
| ErrorMsg.INVALID_TABLE.getMsg(StringUtils.wrap(mviewName, "'")))); |
| } |
| // Check status of compaction job |
| TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); |
| ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); |
| Assert.assertEquals("Unexpected number of compactions in history", 0, resp.getCompactsSize()); |
| } |
| } |
| |
| @Test |
| public void testRenameMakeCopyPartition() throws Exception { |
| runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition (p='a') values (1,2),(3,4)"); |
| runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition (p='b') values (5,5),(4,4)"); |
| |
| HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_RENAME_PARTITION_MAKE_COPY, true); |
| runStatementOnDriver("alter table " + Table.ACIDTBLPART + " partition (p='b') rename to partition (p='c')"); |
| |
| FileSystem fs = FileSystem.get(hiveConf); |
| FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=b"), |
| AcidUtils.baseFileFilter); |
| if (1 != stat.length) { |
| Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat)); |
| } |
| String name = stat[0].getPath().getName(); |
| Assert.assertEquals("base_0000003", name); |
| stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=a"), |
| AcidUtils.baseFileFilter); |
| if (0 != stat.length) { |
| Assert.fail("Expecting no base and found " + stat.length + " files " + Arrays.toString(stat)); |
| } |
| |
| List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBLPART + " where p='b'"); |
| Assert.assertEquals(0, r.size()); |
| |
| r = runStatementOnDriver("select * from " + Table.ACIDTBLPART); |
| Assert.assertEquals(4, r.size()); |
| |
| TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); |
| ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); |
| |
| Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); |
| Assert.assertTrue(resp.getCompacts().stream().anyMatch( |
| ci -> TxnStore.CLEANING_RESPONSE.equals(ci.getState()) && "p=b".equals(ci.getPartitionname()))); |
| |
| runCleaner(hiveConf); |
| |
| stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase()), |
| path -> path.getName().equals("p=b")); |
| if (0 != stat.length) { |
| Assert.fail("Expecting partition data to be removed from FS"); |
| } |
| } |
| |
| @Test |
| public void testRenameMakeCopyNestedPartition() throws Exception { |
| runStatementOnDriver("insert into " + Table.ACIDTBLNESTEDPART + " partition (p1='a', p2='b', p3='c') values (1,1),(2,2)"); |
| runStatementOnDriver("insert into " + Table.ACIDTBLNESTEDPART + " partition (p1='a', p2='b', p3='d') values (3,3),(4,4)"); |
| |
| HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_RENAME_PARTITION_MAKE_COPY, true); |
| runStatementOnDriver("alter table " + Table.ACIDTBLNESTEDPART + " partition (p1='a', p2='b', p3='d')" + |
| " rename to partition (p1='a', p2='c', p3='d')"); |
| |
| TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); |
| ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); |
| Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); |
| |
| FileSystem fs = FileSystem.get(hiveConf); |
| FileStatus[] stat; |
| |
| String partName = "p1=a/p2=b/p3=d"; |
| Assert.assertTrue(resp.getCompacts().stream().anyMatch( |
| ci -> TxnStore.CLEANING_RESPONSE.equals(ci.getState()) && partName.equals(ci.getPartitionname()))); |
| |
| stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLNESTEDPART.toString().toLowerCase() + "/" + partName), |
| AcidUtils.baseFileFilter); |
| if (1 != stat.length) { |
| Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat)); |
| } |
| String name = stat[0].getPath().getName(); |
| Assert.assertEquals("base_0000003", name); |
| |
| stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLNESTEDPART.toString().toLowerCase() + "/p1=a/p2=c/p3=d"), |
| AcidUtils.baseFileFilter); |
| if (0 != stat.length) { |
| Assert.fail("Expecting no base and found " + stat.length + " files " + Arrays.toString(stat)); |
| } |
| |
| List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBLNESTEDPART); |
| Assert.assertEquals(4, r.size()); |
| |
| runCleaner(hiveConf); |
| |
| stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLNESTEDPART.toString().toLowerCase() + "/p1=a/p2=b"), |
| path -> path.getName().equals("p3=d")); |
| if (0 != stat.length) { |
| Assert.fail("Expecting partition data to be removed from FS"); |
| } |
| } |
| |
| @Test |
| public void testIsRawFormatFile() throws Exception { |
| dropTable(new String[]{"file_formats"}); |
| |
| runStatementOnDriver("CREATE TABLE `file_formats`(`id` int, `name` string) " + |
| " ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' " + |
| "WITH SERDEPROPERTIES ( " + |
| " 'field.delim'='|', " + |
| " 'line.delim'='\n'," + |
| " 'serialization.format'='|') " + |
| "STORED AS " + |
| " INPUTFORMAT " + |
| " 'org.apache.hadoop.mapred.TextInputFormat' " + |
| " OUTPUTFORMAT " + |
| " 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' " + |
| "TBLPROPERTIES ( " + |
| " 'transactional'='true'," + |
| " 'transactional_properties'='insert_only')"); |
| |
| runStatementOnDriver("insert into file_formats (id, name) values (1, 'Avro'),(2, 'Parquet'),(3, 'ORC')"); |
| |
| List<String> res = runStatementOnDriver("select * from file_formats"); |
| Assert.assertEquals(3, res.size()); |
| } |
| @Test |
| public void testShowCompactions() throws Exception { |
| //generate some compaction history |
| runStatementOnDriver("drop database if exists mydb1 cascade"); |
| runStatementOnDriver("create database mydb1"); |
| runStatementOnDriver("create table mydb1.tbl0 " + "(a int, b int) partitioned by (p string) clustered by (a) into " + |
| BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); |
| runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " + |
| " values(1,2,'p1'),(3,4,'p1'),(1,2,'p2'),(3,4,'p2'),(1,2,'p3'),(3,4,'p3')"); |
| runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p1') compact 'MAJOR'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p2') compact 'MAJOR'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p3') compact 'MAJOR'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " + |
| " values(4,5,'p1'),(6,7,'p1'),(4,5,'p2'),(6,7,'p2'),(4,5,'p3'),(6,7,'p3')"); |
| runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p1') compact 'MAJOR'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p2') compact 'MAJOR'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p3') compact 'MAJOR' pool 'pool0'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); |
| |
| SessionState.get().setCurrentDatabase("mydb1"); |
| |
| //testing show compaction command |
| ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); |
| List<String> r = runStatementOnDriver("SHOW COMPACTIONS"); |
| Assert.assertEquals(rsp.getCompacts().size()+1, r.size());//includes Header row |
| |
| r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb1 STATUS 'ready for cleaning'"); |
| Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getState().equals("ready for cleaning")).count() +1, |
| r.size());//includes Header row |
| Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" + |
| "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" + |
| "Highest WriteId", r.get(0)); |
| Pattern p = Pattern.compile(".*mydb1.*\tready for cleaning.*"); |
| for(int i = 1; i < r.size(); i++) { |
| Assert.assertTrue(p.matcher(r.get(i)).matches()); |
| } |
| r = runStatementOnDriver("SHOW COMPACTIONS COMPACTIONID=1"); |
| Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getId()==1).count() +1, |
| r.size());//includes Header row |
| Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" + |
| "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" + |
| "Highest WriteId", r.get(0)); |
| p = Pattern.compile("1\t.*"); |
| for(int i = 1; i < r.size(); i++) { |
| Assert.assertTrue(p.matcher(r.get(i)).matches()); |
| } |
| r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb1 TYPE 'MAJOR' "); |
| Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb1")). |
| filter(x->x.getType().equals(CompactionType.MAJOR)).count()+1, r.size());//includes Header row |
| Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" + |
| "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" + |
| "Highest WriteId", r.get(0)); |
| p = Pattern.compile(".*mydb1.*\tMAJOR.*"); |
| for(int i = 1; i < r.size(); i++) { |
| Assert.assertTrue(p.matcher(r.get(i)).matches()); |
| } |
| |
| r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb1 POOL 'poolx' TYPE 'MINOR' "); |
| //includes Header row |
| Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb1")). |
| filter(x->x.getPoolName().equals("poolx")).filter(x->x.getType().equals(CompactionType.MAJOR)).count()+1, r.size()); |
| Assert.assertEquals(1,r.size());//only header row |
| |
| r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb1 POOL 'pool0' TYPE 'MAJOR'"); |
| Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb1")). |
| filter(x->x.getPoolName().equals("pool0")).filter(x->x.getType().equals(CompactionType.MAJOR)).count()+1, r.size());//includes Header row |
| Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" + |
| "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" + |
| "Highest WriteId", r.get(0)); |
| p = Pattern.compile(".*mydb1.*\tMAJOR.*\tpool0.*"); |
| for(int i = 1; i < r.size(); i++) { |
| Assert.assertTrue(p.matcher(r.get(i)).matches()); |
| } |
| |
| r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb1 POOL 'pool0'"); |
| Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb1")). |
| filter(x->x.getPoolName().equals("pool0")).count()+1, r.size());//includes Header row |
| Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" + |
| "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" + |
| "Highest WriteId", r.get(0)); |
| p = Pattern.compile(".*mydb1.*\tpool0.*"); |
| for(int i = 1; i < r.size(); i++) { |
| Assert.assertTrue(p.matcher(r.get(i)).matches()); |
| } |
| |
| r = runStatementOnDriver("SHOW COMPACTIONS DATABASE mydb1 POOL 'pool0'"); |
| Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb1")). |
| filter(x->x.getPoolName().equals("pool0")).count()+1, r.size());//includes Header row |
| Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" + |
| "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" + |
| "Highest WriteId", r.get(0)); |
| p = Pattern.compile(".*mydb1.*\tpool0.*"); |
| for(int i = 1; i < r.size(); i++) { |
| Assert.assertTrue(p.matcher(r.get(i).toString()).matches()); |
| } |
| |
| r = runStatementOnDriver("SHOW COMPACTIONS tbl0 TYPE 'MAJOR' "); |
| Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getTablename().equals("tbl0")). |
| filter(x->x.getType().equals(CompactionType.MAJOR)).count()+1, r.size());//includes Header row |
| Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" + |
| "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" + |
| "Highest WriteId", r.get(0)); |
| p = Pattern.compile(".*tbl0.*\tMAJOR.*"); |
| for(int i = 1; i < r.size(); i++) { |
| Assert.assertTrue(p.matcher(r.get(i)).matches()); |
| } |
| |
| r = runStatementOnDriver("SHOW COMPACTIONS mydb1.tbl0 PARTITION (p='p3') "); |
| //includes Header row |
| Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb1")). |
| filter(x->x.getTablename().equals("tbl0")).filter(x->x.getPartitionname().equals("p=p3")).count() + 1, r.size()); |
| Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" + |
| "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" + |
| "Highest WriteId", r.get(0)); |
| p = Pattern.compile(".*mydb1\ttbl0\tp=p3.*"); |
| for(int i = 1; i < r.size(); i++) { |
| Assert.assertTrue(p.matcher(r.get(i)).matches()); |
| } |
| //includes Header row |
| r = runStatementOnDriver("SHOW COMPACTIONS mydb1.tbl0 PARTITION (p='p3') pool 'pool0' TYPE 'MAJOR'"); |
| Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb1")). |
| filter(x->x.getTablename().equals("tbl0")).filter(x->x.getPartitionname().equals("p=p3")). |
| filter(x->x.getPoolName().equals("pool0")).filter(x->x.getType().equals(CompactionType.MAJOR)).count() + 1, r.size()); |
| Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" + |
| "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" + |
| "Highest WriteId", r.get(0)); |
| p = Pattern.compile(".*mydb1\ttbl0\tp=p3\tMAJOR.*\tpool0.*"); |
| for(int i = 1; i < r.size(); i++) { |
| Assert.assertTrue(p.matcher(r.get(i)).matches()); |
| } |
| |
| } |
| |
| @Test |
| public void testShowCompactionFilterWithPartition()throws Exception { |
| setUpCompactionRequestsData("mydb","tbl2"); |
| executeCompactionRequest("mydb","tbl2", "MAJOR","ds='mon'"); |
| SessionState.get().setCurrentDatabase("mydb"); |
| |
| ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); |
| |
| //includes Header row |
| List<String> r = runStatementOnDriver("SHOW COMPACTIONS"); |
| Assert.assertEquals(rsp.getCompacts().size()+1, r.size()); |
| |
| //includes Header row |
| r = runStatementOnDriver("SHOW COMPACTIONS tbl2 STATUS 'refused'"); |
| Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getState().equals("refused")).count()+1, r.size()); |
| Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" + |
| "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" + |
| "Highest WriteId", r.get(0)); |
| Pattern p = Pattern.compile(".*tbl2.*\trefused.*"); |
| for(int i = 1; i < r.size(); i++) { |
| Assert.assertTrue(p.matcher(r.get(i)).matches()); |
| } |
| |
| //includes Header row |
| r = runStatementOnDriver("SHOW COMPACTIONS tbl2 "); |
| Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getTablename().equals("tbl2")).count()+1, r.size()); |
| Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" + |
| "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" + |
| "Highest WriteId", r.get(0)); |
| p = Pattern.compile(".*tbl2.*"); |
| for(int i = 1; i < r.size(); i++) { |
| Assert.assertTrue(p.matcher(r.get(i)).matches()); |
| } |
| |
| //includes Header row |
| r = runStatementOnDriver("SHOW COMPACTIONS mydb.tbl2 PARTITION (ds='mon') "); |
| Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb")). |
| filter(x->x.getTablename().equals("tbl2")).filter(x->x.getPartitionname().equals("ds=mon")).count()+1, r.size()); |
| Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" + |
| "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" + |
| "Highest WriteId", r.get(0)); |
| p = Pattern.compile(".*mydb\ttbl2\tds=mon.*"); |
| for(int i = 1; i < r.size(); i++) { |
| Assert.assertTrue(p.matcher(r.get(i)).matches()); |
| } |
| |
| //includes Header row |
| r = runStatementOnDriver("SHOW COMPACTIONS mydb.tbl2 PARTITION (ds='mon') TYPE 'MAJOR' "); |
| Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb")). |
| filter(x->x.getTablename().equals("tbl2")).filter(x->x.getPartitionname().equals("ds=mon")). |
| filter(x->x.getType().equals(CompactionType.MAJOR)).count()+1, r.size()); |
| Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" + |
| "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" + |
| "Highest WriteId", r.get(0)); |
| p = Pattern.compile(".*mydb\ttbl2\tds=mon\tMAJOR.*"); |
| for(int i = 1; i < r.size(); i++) { |
| Assert.assertTrue(p.matcher(r.get(i)).matches()); |
| } |
| |
| //includes Header row |
| r = runStatementOnDriver("SHOW COMPACTIONS DATABASE mydb TYPE 'MAJOR' "); |
| Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb")). |
| filter(x->x.getType().equals(CompactionType.MAJOR)).count()+1, r.size()); |
| Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" + |
| "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" + |
| "Highest WriteId", r.get(0)); |
| p = Pattern.compile(".*mydb.*\tMAJOR.*"); |
| for(int i = 1; i < r.size(); i++) { |
| Assert.assertTrue(p.matcher(r.get(i)).matches()); |
| } |
| |
| executeCompactionRequest("mydb","tbl2", "MINOR","ds='wed'"); |
| |
| rsp = txnHandler.showCompact(new ShowCompactRequest()); |
| //includes Header row |
| r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb TYPE 'MINOR' "); |
| Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb")). |
| filter(x->x.getType().equals(CompactionType.MINOR)).count()+1, r.size()); |
| Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" + |
| "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" + |
| "Highest WriteId", r.get(0)); |
| p = Pattern.compile(".*mydb.*\tMINOR.*"); |
| for(int i = 1; i < r.size(); i++) { |
| Assert.assertTrue(p.matcher(r.get(i)).matches()); |
| } |
| |
| //includes Header row |
| r = runStatementOnDriver("SHOW COMPACTIONS mydb.tbl2 PARTITION (ds='wed') "); |
| Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb")). |
| filter(x->x.getTablename().equals("tbl2")).filter(x->x.getPartitionname().equals("ds=wed")).count()+1, r.size()); |
| for(int i=1;i<r.size();i++) { |
| Assert.assertTrue(r.get(i).contains("mydb")); |
| Assert.assertTrue(r.get(i).contains("tbl2")); |
| Assert.assertTrue(r.get(i).contains("ds=wed")); |
| } |
| |
| r = runStatementOnDriver("SHOW COMPACTIONS tbl2 "); |
| Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getTablename().equalsIgnoreCase("tbl2")).count()+1, r.size()); |
| for(int i=1;i<r.size();i++) { |
| Assert.assertTrue(r.get(i).contains("tbl2")); |
| } |
| //includes Header row |
| r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mymydbdb2 TYPE 'MAJOR' "); |
| Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mymydbdb2")). |
| filter(x->x.getType().equals(CompactionType.MAJOR)).count()+1, r.size()); |
| Assert.assertEquals(1,r.size());//only header row |
| } |
| @Test |
| public void testShowCompactionInputValidation() throws Exception { |
| setUpCompactionRequestsData("mydb2","tbl2"); |
| executeCompactionRequest("mydb2","tbl2", "MAJOR", "ds='mon'"); |
| SessionState.get().setCurrentDatabase("mydb2"); |
| |
| //validation testing of paramters |
| expectedException.expect(RuntimeException.class); |
| List<String> r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb POOL 'pool0' TYPE 'MAJOR'");// validates db |
| r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb2 TYPE 'MAJR'");// validates compaction type |
| r = runStatementOnDriver("SHOW COMPACTIONS mydb2.tbl1 PARTITION (ds='mon') TYPE 'MINOR' " + |
| "STATUS 'ready for clean'");// validates table |
| r = runStatementOnDriver("SHOW COMPACTIONS mydb2.tbl2 PARTITION (p=101,day='Monday') POOL 'pool0' TYPE 'minor' " + |
| "STATUS 'ready for clean'");// validates partspec |
| r = runStatementOnDriver("SHOW COMPACTIONS mydb1.tbl0 PARTITION (p='p1') POOL 'pool0' TYPE 'minor' " + |
| "STATUS 'ready for clean'");//validates compaction status |
| } |
| |
| @Test |
| public void testShowCompactionFilterSortingAndLimit() throws Exception { |
| runStatementOnDriver("drop database if exists mydb1 cascade"); |
| runStatementOnDriver("create database mydb1"); |
| runStatementOnDriver("create table mydb1.tbl0 " + "(a int, b int) partitioned by (p string) clustered by (a) into " + |
| BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); |
| runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " + |
| " values(1,2,'p1'),(3,4,'p1'),(1,2,'p2'),(3,4,'p2'),(1,2,'p3'),(3,4,'p3')"); |
| runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p1') compact 'MAJOR' pool 'poolx'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p2') compact 'MAJOR'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| |
| |
| runStatementOnDriver("drop database if exists mydb cascade"); |
| runStatementOnDriver("create database mydb"); |
| runStatementOnDriver("create table mydb.tbl " + "(a int, b int) partitioned by (ds string) clustered by (a) into " + |
| BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); |
| runStatementOnDriver("insert into mydb.tbl" + " PARTITION(ds) " + |
| " values(1,2,'mon'),(3,4,'tue'),(1,2,'mon'),(3,4,'tue'),(1,2,'wed'),(3,4,'wed')"); |
| runStatementOnDriver("alter table mydb.tbl" + " PARTITION(ds='mon') compact 'MAJOR'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| runStatementOnDriver("alter table mydb.tbl" + " PARTITION(ds='tue') compact 'MAJOR'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| |
| runStatementOnDriver("create table mydb.tbl2 " + "(a int, b int) partitioned by (dm string) clustered by (a) into " + |
| BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); |
| runStatementOnDriver("insert into mydb.tbl2" + " PARTITION(dm) " + |
| " values(1,2,'xxx'),(3,4,'xxx'),(1,2,'yyy'),(3,4,'yyy'),(1,2,'zzz'),(3,4,'zzz')"); |
| runStatementOnDriver("alter table mydb.tbl2" + " PARTITION(dm='yyy') compact 'MAJOR'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| runStatementOnDriver("alter table mydb.tbl2" + " PARTITION(dm='zzz') compact 'MAJOR'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| |
| ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); |
| |
| //includes Header row |
| List<String> r = runStatementOnDriver("SHOW COMPACTIONS"); |
| Assert.assertEquals(rsp.getCompacts().size() + 1, r.size()); |
| r = runStatementOnDriver("SHOW COMPACTIONS LIMIT 3"); |
| Assert.assertEquals(4, r.size()); |
| r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb TYPE 'MAJOR' LIMIT 2"); |
| Assert.assertEquals(3, r.size()); |
| |
| r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb TYPE 'MAJOR' ORDER BY 'tabname' DESC,'partname' ASC"); |
| Assert.assertEquals(5, r.size()); |
| Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" + |
| "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" + |
| "Highest WriteId", r.get(0)); |
| Pattern p = Pattern.compile(".*mydb\ttbl2\tdm.*"); |
| for (int i = 1; i < r.size() - 3; i++) { |
| Assert.assertTrue(p.matcher(r.get(i)).matches()); |
| } |
| p = Pattern.compile(".*mydb\ttbl\tds.*"); |
| for (int i = 3; i < r.size() - 1; i++) { |
| Assert.assertTrue(p.matcher(r.get(i)).matches()); |
| } |
| |
| r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb1 TYPE 'MAJOR' ORDER BY 'poolname' ASC"); |
| Assert.assertEquals(3, r.size()); |
| Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" + |
| "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" + |
| "Highest WriteId", r.get(0)); |
| List<String> txnIdActualList = r.stream().skip(1).map(x -> x.split("\t")[15]).collect(Collectors.toList()); |
| List<String> txnIdExpectedList = r.stream().skip(1).map(x -> x.split("\t")[15]).sorted(Collections.reverseOrder()). |
| collect(Collectors.toList()); |
| Assert.assertEquals(txnIdExpectedList, txnIdActualList); |
| r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb TYPE 'MAJOR' ORDER BY 'txnid' DESC"); |
| Assert.assertEquals(5, r.size()); |
| Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" + |
| "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" + |
| "Highest WriteId", r.get(0)); |
| txnIdActualList = r.stream().skip(1).map(x -> x.split("\t")[16]).collect(Collectors.toList()); |
| txnIdExpectedList = r.stream().skip(1).map(x -> x.split("\t")[16]).sorted(Collections.reverseOrder()). |
| collect(Collectors.toList()); |
| Collections.sort(txnIdExpectedList, Collections.reverseOrder()); |
| Assert.assertEquals(txnIdExpectedList, txnIdActualList); |
| |
| r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb TYPE 'MAJOR' ORDER BY TxnId DESC"); |
| Assert.assertEquals(5, r.size()); |
| Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" + |
| "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" + |
| "Highest WriteId", r.get(0)); |
| txnIdActualList = r.stream().skip(1).map(x -> x.split("\t")[16]).collect(Collectors.toList()); |
| txnIdExpectedList = r.stream().skip(1).map(x -> x.split("\t")[16]).sorted(Collections.reverseOrder()). |
| collect(Collectors.toList()); |
| Assert.assertEquals(txnIdExpectedList, txnIdActualList); |
| |
| |
| expectedException.expect(RuntimeException.class); |
| r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb TYPE 'MAJOR' ORDER BY tbl DESC,PARTITIONS ASC"); |
| expectedException.expect(RuntimeException.class); |
| r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb TYPE 'MAJOR' ORDER BY tbl DESC,PARTITIONS ASC"); |
| |
| } |
| |
| @Test |
| public void testAbortCompactions() throws Exception { |
| //generate some compaction history |
| runStatementOnDriver("drop database if exists mydb1 cascade"); |
| runStatementOnDriver("create database mydb1"); |
| runStatementOnDriver("create table mydb1.tbl0 " + "(a int, b int) partitioned by (p string) clustered by (a) into " + |
| BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); |
| runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " + |
| " values(1,2,'p1'),(3,4,'p1'),(1,2,'p2'),(3,4,'p2'),(1,2,'p3'),(3,4,'p3')"); |
| runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p1') compact 'MAJOR'"); |
| TestTxnCommands2.runInitiator(hiveConf); |
| TestTxnCommands2.runWorker(hiveConf); |
| runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p2') compact 'MAJOR'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p3') compact 'MAJOR'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " + |
| " values(4,5,'p1'),(6,7,'p1'),(4,5,'p2'),(6,7,'p2'),(4,5,'p3'),(6,7,'p3')"); |
| runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p1') compact 'MAJOR'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p2') compact 'MAJOR'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p3') compact 'MAJOR' pool 'pool0'"); |
| TestTxnCommands2.runInitiator(hiveConf); |
| TestTxnCommands2.runWorker(hiveConf); |
| TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); |
| |
| |
| |
| runStatementOnDriver("create table mydb1.tbl2 " + "(a int, b int) partitioned by (p string) clustered by (a) into " + |
| BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); |
| runStatementOnDriver("insert into mydb1.tbl2" + " PARTITION(p) " + |
| " values(1,2,'p1'),(3,4,'p1'),(1,2,'p2'),(3,4,'p2'),(1,2,'p3'),(3,4,'p3')"); |
| runStatementOnDriver("alter table mydb1.tbl2" + " PARTITION(p='p1') compact 'MAJOR'"); |
| runStatementOnDriver("alter table mydb1.tbl2" + " PARTITION(p='p2') compact 'MAJOR'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| TestTxnCommands2.runCleaner(hiveConf); |
| runStatementOnDriver("alter table mydb1.tbl2" + " PARTITION(p='p3') compact 'MAJOR'"); |
| runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " + |
| " values(4,5,'p1'),(6,7,'p1'),(4,5,'p2'),(6,7,'p2'),(4,5,'p3'),(6,7,'p3')"); |
| TestTxnCommands2.runWorker(hiveConf); |
| TestTxnCommands2.runCleaner(hiveConf); |
| runStatementOnDriver("insert into mydb1.tbl2" + " PARTITION(p) " + |
| " values(11,12,'p1'),(13,14,'p1'),(11,12,'p2'),(13,14,'p2'),(11,12,'p3'),(13,14,'p3')"); |
| runStatementOnDriver("alter table mydb1.tbl2" + " PARTITION (p='p1') compact 'MINOR'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| |
| runStatementOnDriver("create table mydb1.tbl3 " + "(a int, b int) partitioned by (ds string) clustered by (a) into " + |
| BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); |
| runStatementOnDriver("insert into mydb1.tbl3" + " PARTITION(ds) " + |
| " values(1,2,'today'),(3,4,'today'),(1,2,'tomorrow'),(3,4,'tomorrow'),(1,2,'yesterday'),(3,4,'yesterday')"); |
| runStatementOnDriver("alter table mydb1.tbl3" + " PARTITION(ds='today') compact 'MAJOR'"); |
| TestTxnCommands2.runWorker(hiveConf); |
| |
| SessionState.get().setCurrentDatabase("mydb1"); |
| |
| //testing show compaction command |
| |
| List<String> r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb1 STATUS 'initiated'"); |
| Assert.assertEquals(3,r.size()); |
| List<String>compIdsToAbort = r.stream().skip(1).map(x -> x.split("\t")[0]).collect(Collectors.toList()); |
| String abortCompactionCmd = "ABORT COMPACTIONS " +compIdsToAbort.get(0)+"\t"+compIdsToAbort.get(1); |
| r = runStatementOnDriver(abortCompactionCmd); |
| Assert.assertEquals(3,r.size()); |
| Assert.assertEquals("CompactionId\tStatus\tMessage", r.get(0)); |
| Assert.assertTrue(r.get(1).contains("Successfully aborted compaction")); |
| Assert.assertTrue(r.get(2).contains("Successfully aborted compaction")); |
| |
| abortCompactionCmd = "ABORT COMPACTIONS " +compIdsToAbort.get(0)+"\t"+compIdsToAbort.get(1); |
| r = runStatementOnDriver(abortCompactionCmd); |
| Assert.assertEquals(3,r.size()); |
| Assert.assertEquals("CompactionId\tStatus\tMessage", r.get(0)); |
| Assert.assertTrue(r.get(1).contains("Error")); |
| |
| r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb1 STATUS 'aborted'"); |
| Assert.assertEquals(3,r.size()); |
| |
| } |
| |
| |
| private void setUpCompactionRequestsData(String dbName, String tbName) throws Exception { |
| runStatementOnDriver("drop database if exists " + dbName); |
| runStatementOnDriver("create database " + dbName); |
| runStatementOnDriver("create table " + dbName + "." + tbName + " (a int, b int) partitioned by (ds String) stored as orc " + |
| "TBLPROPERTIES ('transactional'='true')"); |
| runStatementOnDriver("insert into " + dbName + "." + tbName + " PARTITION (ds) " + |
| " values(1,2,'mon'),(3,4,'mon'),(1,2,'tue'),(3,4,'tue'),(1,2,'wed'),(3,4,'wed')"); |
| } |
| |
| private void executeCompactionRequest(String dbName, String tbName, String compactiontype, String partition) throws Exception { |
| runStatementOnDriver("alter table "+dbName+"."+tbName+" PARTITION (" +partition+") compact '"+compactiontype + "'" ); |
| TestTxnCommands2.runWorker(hiveConf); |
| } |
| |
| @Test |
| public void testFetchTaskCachingWithConversion() throws Exception { |
| dropTable(new String[]{"fetch_task_table"}); |
| List actualRes = new ArrayList<>(); |
| runStatementOnDriver("create table fetch_task_table (a INT, b INT) stored as orc" + |
| " tblproperties ('transactional'='true')"); |
| runStatementOnDriver("insert into table fetch_task_table values (1,2), (3,4), (5,6)"); |
| List expectedRes = runStatementOnDriver("select * from fetch_task_table"); |
| |
| hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_FETCH_TASK_CACHING, true); |
| hiveConf.setVar(HiveConf.ConfVars.HIVE_FETCH_TASK_CONVERSION, "none"); |
| d.run("select * from fetch_task_table"); |
| Assert.assertFalse(d.getFetchTask().isCachingEnabled()); |
| d.getFetchTask().fetch(actualRes); |
| Assert.assertEquals(actualRes, expectedRes); |
| actualRes.clear(); |
| |
| hiveConf.setVar(HiveConf.ConfVars.HIVE_FETCH_TASK_CONVERSION, "more"); |
| d.run("select * from fetch_task_table"); |
| Assert.assertTrue(d.getFetchTask().isCachingEnabled()); |
| d.getFetchTask().fetch(actualRes); |
| Assert.assertEquals(actualRes, expectedRes); |
| } |
| } |