blob: 64baa9f5b2021a8be560aa21f10e06f131019096 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
import org.apache.hadoop.hive.metastore.api.TxnInfo;
import org.apache.hadoop.hive.metastore.api.TxnState;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import java.io.File;
import java.io.FileOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* The LockManager is not ready, but for no-concurrency straight-line path we can
* test AC=true, and AC=false with commit/rollback/exception and test resulting data.
*
* Can also test, calling commit in AC=true mode, etc, toggling AC...
*/
public class TestTxnCommands {
private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
File.separator + TestTxnCommands.class.getCanonicalName()
+ "-" + System.currentTimeMillis()
).getPath().replaceAll("\\\\", "/");
private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
//bucket count for test tables; set it to 1 for easier debugging
private static int BUCKET_COUNT = 2;
@Rule
public TestName testName = new TestName();
private HiveConf hiveConf;
private Driver d;
private static enum Table {
ACIDTBL("acidTbl"),
ACIDTBL2("acidTbl2"),
NONACIDORCTBL("nonAcidOrcTbl"),
NONACIDORCTBL2("nonAcidOrcTbl2");
private final String name;
@Override
public String toString() {
return name;
}
Table(String name) {
this.name = name;
}
}
@Before
public void setUp() throws Exception {
tearDown();
hiveConf = new HiveConf(this.getClass());
hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
hiveConf
.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
TxnDbUtil.setConfValues(hiveConf);
TxnDbUtil.prepDb();
File f = new File(TEST_WAREHOUSE_DIR);
if (f.exists()) {
FileUtil.fullyDelete(f);
}
if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) {
throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR);
}
SessionState.start(new SessionState(hiveConf));
d = new Driver(hiveConf);
dropTables();
runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
runStatementOnDriver("create temporary table " + Table.ACIDTBL2 + "(a int, b int, c int) clustered by (c) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
}
private void dropTables() throws Exception {
for(Table t : Table.values()) {
runStatementOnDriver("drop table if exists " + t);
}
}
@After
public void tearDown() throws Exception {
try {
if (d != null) {
runStatementOnDriver("set autocommit true");
dropTables();
d.destroy();
d.close();
d = null;
}
} finally {
TxnDbUtil.cleanDb();
FileUtils.deleteDirectory(new File(TEST_DATA_DIR));
}
}
@Test
public void testInsertOverwrite() throws Exception {
runStatementOnDriver("insert overwrite table " + Table.NONACIDORCTBL + " select a,b from " + Table.NONACIDORCTBL2);
runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "3(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
}
@Ignore("not needed but useful for testing")
@Test
public void testNonAcidInsert() throws Exception {
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(2,3)");
List<String> rs1 = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
}
/**
* Useful for debugging. Dumps ORC file in JSON to CWD.
*/
private void dumpBucketData(Table table, long txnId, int stmtId, int bucketNum) throws Exception {
if(true) {
return;
}
Path bucket = AcidUtils.createBucketFile(new Path(new Path(TEST_WAREHOUSE_DIR, table.toString().toLowerCase()), AcidUtils.deltaSubdir(txnId, txnId, stmtId)), bucketNum);
FileOutputStream delta = new FileOutputStream(testName.getMethodName() + "_" + bucket.getParent().getName() + "_" + bucket.getName());
// try {
// FileDump.printJsonData(hiveConf, bucket.toString(), delta);
// }
// catch(FileNotFoundException ex) {
;//this happens if you change BUCKET_COUNT
// }
delta.close();
}
/**
* Dump all data in the table by bucket in JSON format
*/
private void dumpTableData(Table table, long txnId, int stmtId) throws Exception {
for(int bucketNum = 0; bucketNum < BUCKET_COUNT; bucketNum++) {
dumpBucketData(table, txnId, stmtId, bucketNum);
}
}
@Test
public void testSimpleAcidInsert() throws Exception {
int[][] rows1 = {{1,2},{3,4}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
//List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
//Assert.assertEquals("Data didn't match in autocommit=true (rs)", stringifyValues(rows1), rs);
runStatementOnDriver("set autocommit false");
runStatementOnDriver("START TRANSACTION");
int[][] rows2 = {{5,6},{7,8}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2));
List<String> allData = stringifyValues(rows1);
allData.addAll(stringifyValues(rows2));
List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Data didn't match inside tx (rs0)", allData, rs0);
runStatementOnDriver("COMMIT WORK");
dumpTableData(Table.ACIDTBL, 1, 0);
dumpTableData(Table.ACIDTBL, 2, 0);
runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
runStatementOnDriver("COMMIT");//txn started implicitly by previous statement
runStatementOnDriver("set autocommit true");
List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Data didn't match inside tx (rs0)", allData, rs1);
}
/**
* add tests for all transitions - AC=t, AC=t, AC=f, commit (for example)
* @throws Exception
*/
@Test
public void testErrors() throws Exception {
runStatementOnDriver("set autocommit true");
CommandProcessorResponse cpr = runStatementOnDriverNegative("start transaction");
Assert.assertEquals("Error didn't match: " + cpr, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode());
runStatementOnDriver("set autocommit false");
runStatementOnDriver("start transaction");
CommandProcessorResponse cpr2 = runStatementOnDriverNegative("create table foo(x int, y int)");
Assert.assertEquals("Expected DDL to fail in an open txn", ErrorMsg.OP_NOT_ALLOWED_IN_TXN.getErrorCode(), cpr2.getErrorCode());
runStatementOnDriver("set autocommit true");
CommandProcessorResponse cpr3 = runStatementOnDriverNegative("update " + Table.ACIDTBL + " set a = 1 where b != 1");
Assert.assertEquals("Expected update of bucket column to fail",
"FAILED: SemanticException [Error 10302]: Updating values of bucketing columns is not supported. Column a.",
cpr3.getErrorMessage());
//line below should in principle work but Driver doesn't propagate errorCode properly
//Assert.assertEquals("Expected update of bucket column to fail", ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE.getErrorCode(), cpr3.getErrorCode());
cpr3 = runStatementOnDriverNegative("commit work");//not allowed in AC=true
Assert.assertEquals("Error didn't match: " + cpr3, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode());
cpr3 = runStatementOnDriverNegative("rollback work");//not allowed in AC=true
Assert.assertEquals("Error didn't match: " + cpr3, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode());
runStatementOnDriver("set autocommit false");
cpr3 = runStatementOnDriverNegative("commit");//not allowed in w/o tx
Assert.assertEquals("Error didn't match: " + cpr3, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode());
cpr3 = runStatementOnDriverNegative("rollback");//not allowed in w/o tx
Assert.assertEquals("Error didn't match: " + cpr3, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode());
runStatementOnDriver("start transaction");
cpr3 = runStatementOnDriverNegative("start transaction");//not allowed in a tx
Assert.assertEquals("Expected start transaction to fail", ErrorMsg.OP_NOT_ALLOWED_IN_TXN.getErrorCode(), cpr3.getErrorCode());
runStatementOnDriver("start transaction");//ok since previously opened txn was killed
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Can't see my own write", 1, rs0.size());
runStatementOnDriver("set autocommit true");//this should commit previous txn
rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Can't see my own write", 1, rs0.size());
}
@Test
public void testReadMyOwnInsert() throws Exception {
runStatementOnDriver("set autocommit false");
runStatementOnDriver("START TRANSACTION");
List<String> rs = runStatementOnDriver("select * from " + Table.ACIDTBL);
Assert.assertEquals("Expected empty " + Table.ACIDTBL, 0, rs.size());
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Can't see my own write", 1, rs0.size());
runStatementOnDriver("commit");
runStatementOnDriver("START TRANSACTION");
List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
runStatementOnDriver("rollback work");
Assert.assertEquals("Can't see write after commit", 1, rs1.size());
}
@Test
public void testImplicitRollback() throws Exception {
runStatementOnDriver("set autocommit false");
runStatementOnDriver("START TRANSACTION");
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Can't see my own write", 1, rs0.size());
//next command should produce an error
CommandProcessorResponse cpr = runStatementOnDriverNegative("select * from no_such_table");
Assert.assertEquals("Txn didn't fail?",
"FAILED: SemanticException [Error 10001]: Line 1:14 Table not found 'no_such_table'",
cpr.getErrorMessage());
runStatementOnDriver("start transaction");
List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
runStatementOnDriver("commit");
Assert.assertEquals("Didn't rollback as expected", 0, rs1.size());
}
@Test
public void testExplicitRollback() throws Exception {
runStatementOnDriver("set autocommit false");
runStatementOnDriver("START TRANSACTION");
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
runStatementOnDriver("ROLLBACK");
runStatementOnDriver("set autocommit true");
List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Rollback didn't rollback", 0, rs.size());
}
@Test
public void testMultipleInserts() throws Exception {
runStatementOnDriver("set autocommit false");
runStatementOnDriver("START TRANSACTION");
int[][] rows1 = {{1,2},{3,4}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
int[][] rows2 = {{5,6},{7,8}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2));
List<String> allData = stringifyValues(rows1);
allData.addAll(stringifyValues(rows2));
List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Content didn't match before commit rs", allData, rs);
runStatementOnDriver("commit");
dumpTableData(Table.ACIDTBL, 1, 0);
dumpTableData(Table.ACIDTBL, 1, 1);
runStatementOnDriver("set autocommit true");
List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Content didn't match after commit rs1", allData, rs1);
}
@Test
public void testDelete() throws Exception {
int[][] rows1 = {{1,2},{3,4}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0);
runStatementOnDriver("set autocommit false");
runStatementOnDriver("START TRANSACTION");
runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 4");
int[][] updatedData2 = {{1,2}};
List<String> rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs3);
runStatementOnDriver("commit");
runStatementOnDriver("set autocommit true");
List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData2), rs4);
}
@Test
public void testUpdateOfInserts() throws Exception {
int[][] rows1 = {{1,2},{3,4}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0);
runStatementOnDriver("set autocommit false");
runStatementOnDriver("START TRANSACTION");
int[][] rows2 = {{5,6},{7,8}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2));
List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
List<String> allData = stringifyValues(rows1);
allData.addAll(stringifyValues(rows2));
Assert.assertEquals("Content didn't match rs1", allData, rs1);
runStatementOnDriver("update " + Table.ACIDTBL + " set b = 1 where b != 1");
int[][] updatedData = {{1,1},{3,1},{5,1},{7,1}};
List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Wrong data after update", stringifyValues(updatedData), rs2);
runStatementOnDriver("commit");
runStatementOnDriver("set autocommit true");
List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData), rs4);
}
@Test
public void testUpdateDeleteOfInserts() throws Exception {
int[][] rows1 = {{1,2},{3,4}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0);
runStatementOnDriver("set autocommit false");
runStatementOnDriver("START TRANSACTION");
int[][] rows2 = {{5,6},{7,8}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2));
List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
List<String> allData = stringifyValues(rows1);
allData.addAll(stringifyValues(rows2));
Assert.assertEquals("Content didn't match rs1", allData, rs1);
runStatementOnDriver("update " + Table.ACIDTBL + " set b = 1 where b != 1");
int[][] updatedData = {{1,1},{3,1},{5,1},{7,1}};
List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Wrong data after update", stringifyValues(updatedData), rs2);
runStatementOnDriver("delete from " + Table.ACIDTBL + " where a = 7 and b = 1");
dumpTableData(Table.ACIDTBL, 1, 0);
dumpTableData(Table.ACIDTBL, 2, 0);
dumpTableData(Table.ACIDTBL, 2, 2);
dumpTableData(Table.ACIDTBL, 2, 4);
int[][] updatedData2 = {{1,1},{3,1},{5,1}};
List<String> rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs3);
runStatementOnDriver("commit");
runStatementOnDriver("set autocommit true");
List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData2), rs4);
}
@Test
public void testMultipleDelete() throws Exception {
int[][] rows1 = {{1,2},{3,4},{5,6},{7,8}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0);
runStatementOnDriver("set autocommit false");
runStatementOnDriver("START TRANSACTION");
runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 8");
int[][] updatedData2 = {{1,2},{3,4},{5,6}};
List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs2);
runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 4");
int[][] updatedData3 = {{1, 2}, {5, 6}};
List<String> rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Wrong data after delete2", stringifyValues(updatedData3), rs3);
runStatementOnDriver("update " + Table.ACIDTBL + " set b=3");
dumpTableData(Table.ACIDTBL, 1, 0);
//nothing actually hashes to bucket0, so update/delete deltas don't have it
dumpTableData(Table.ACIDTBL, 2, 0);
dumpTableData(Table.ACIDTBL, 2, 2);
dumpTableData(Table.ACIDTBL, 2, 4);
List<String> rs5 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
int [][] updatedData4 = {{1,3},{5,3}};
Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData4), rs5);
runStatementOnDriver("commit");
runStatementOnDriver("set autocommit true");
List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData4), rs4);
}
@Test
public void testDeleteIn() throws Exception {
runStatementOnDriver("delete from " + Table.ACIDTBL + " where a IN (SELECT A.a from " +
Table.ACIDTBL + " A)");
int[][] tableData = {{1,2},{3,2},{5,2},{1,3},{3,3},{5,3}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
runStatementOnDriver("insert into " + Table.ACIDTBL2 + "(a,b,c) values(1,7,17),(3,7,17)");
// runStatementOnDriver("select b from " + Table.ACIDTBL + " where a in (select b from " + Table.NONACIDORCTBL + ")");
runStatementOnDriver("delete from " + Table.ACIDTBL + " where a in(select a from " + Table.ACIDTBL2 + ")");
// runStatementOnDriver("delete from " + Table.ACIDTBL + " where a in(select a from " + Table.NONACIDORCTBL + ")");
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) select a,b from " + Table.ACIDTBL2);
List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
int[][] updatedData = {{1,7},{3,7},{5,2},{5,3}};
Assert.assertEquals("Bulk update failed", stringifyValues(updatedData), rs);
}
@Test
public void testTimeOutReaper() throws Exception {
runStatementOnDriver("set autocommit false");
runStatementOnDriver("start transaction");
runStatementOnDriver("delete from " + Table.ACIDTBL + " where a = 5");
//make sure currently running txn is considered aborted by housekeeper
hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, 0, TimeUnit.SECONDS);
hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 2, TimeUnit.MILLISECONDS);
AcidHouseKeeperService houseKeeperService = new AcidHouseKeeperService();
//this will abort the txn
TestTxnCommands2.runHouseKeeperService(houseKeeperService, hiveConf);
//this should fail because txn aborted due to timeout
CommandProcessorResponse cpr = runStatementOnDriverNegative("delete from " + Table.ACIDTBL + " where a = 5");
Assert.assertTrue("Actual: " + cpr.getErrorMessage(), cpr.getErrorMessage().contains("Transaction manager has aborted the transaction txnid:1"));
//now test that we don't timeout locks we should not
//heartbeater should be running in the background every 1/2 second
hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS);
//hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER, true);
runStatementOnDriver("start transaction");
runStatementOnDriver("select count(*) from " + Table.ACIDTBL + " where a = 17");
pause(750);
TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
//since there is txn open, we are heartbeating the txn not individual locks
GetOpenTxnsInfoResponse txnsInfoResponse = txnHandler.getOpenTxnsInfo();
Assert.assertEquals(2, txnsInfoResponse.getOpen_txns().size());
TxnInfo txnInfo = null;
for(TxnInfo ti : txnsInfoResponse.getOpen_txns()) {
if(ti.getState() == TxnState.OPEN) {
txnInfo = ti;
break;
}
}
Assert.assertNotNull(txnInfo);
Assert.assertEquals(2, txnInfo.getId());
Assert.assertEquals(TxnState.OPEN, txnInfo.getState());
String s =TxnDbUtil.queryToString("select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false);
String[] vals = s.split("\\s+");
Assert.assertEquals("Didn't get expected timestamps", 2, vals.length);
long lastHeartbeat = Long.parseLong(vals[1]);
//these 2 values are equal when TXN entry is made. Should never be equal after 1st heartbeat, which we
//expect to have happened by now since HIVE_TXN_TIMEOUT=1sec
Assert.assertNotEquals("Didn't see heartbeat happen", Long.parseLong(vals[0]), lastHeartbeat);
ShowLocksResponse slr = txnHandler.showLocks(new ShowLocksRequest());
TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks());
pause(750);
TestTxnCommands2.runHouseKeeperService(houseKeeperService, hiveConf);
pause(750);
slr = txnHandler.showLocks(new ShowLocksRequest());
Assert.assertEquals("Unexpected lock count: " + slr, 1, slr.getLocks().size());
TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks());
pause(750);
TestTxnCommands2.runHouseKeeperService(houseKeeperService, hiveConf);
slr = txnHandler.showLocks(new ShowLocksRequest());
Assert.assertEquals("Unexpected lock count: " + slr, 1, slr.getLocks().size());
TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks());
//should've done several heartbeats
s =TxnDbUtil.queryToString("select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false);
vals = s.split("\\s+");
Assert.assertEquals("Didn't get expected timestamps", 2, vals.length);
Assert.assertTrue("Heartbeat didn't progress: (old,new) (" + lastHeartbeat + "," + vals[1]+ ")",
lastHeartbeat < Long.parseLong(vals[1]));
runStatementOnDriver("rollback");
slr = txnHandler.showLocks(new ShowLocksRequest());
Assert.assertEquals("Unexpected lock count", 0, slr.getLocks().size());
}
private static void pause(int timeMillis) {
try {
Thread.sleep(timeMillis);
}
catch (InterruptedException e) {
}
}
/**
* takes raw data and turns it into a string as if from Driver.getResults()
* sorts rows in dictionary order
*/
private List<String> stringifyValues(int[][] rowsIn) {
assert rowsIn.length > 0;
int[][] rows = rowsIn.clone();
Arrays.sort(rows, new RowComp());
List<String> rs = new ArrayList<String>();
for(int[] row : rows) {
assert row.length > 0;
StringBuilder sb = new StringBuilder();
for(int value : row) {
sb.append(value).append("\t");
}
sb.setLength(sb.length() - 1);
rs.add(sb.toString());
}
return rs;
}
private static final class RowComp implements Comparator<int[]> {
@Override
public int compare(int[] row1, int[] row2) {
assert row1 != null && row2 != null && row1.length == row2.length;
for(int i = 0; i < row1.length; i++) {
int comp = Integer.compare(row1[i], row2[i]);
if(comp != 0) {
return comp;
}
}
return 0;
}
}
private String makeValuesClause(int[][] rows) {
assert rows.length > 0;
StringBuilder sb = new StringBuilder("values");
for(int[] row : rows) {
assert row.length > 0;
if(row.length > 1) {
sb.append("(");
}
for(int value : row) {
sb.append(value).append(",");
}
sb.setLength(sb.length() - 1);//remove trailing comma
if(row.length > 1) {
sb.append(")");
}
sb.append(",");
}
sb.setLength(sb.length() - 1);//remove trailing comma
return sb.toString();
}
private List<String> runStatementOnDriver(String stmt) throws Exception {
CommandProcessorResponse cpr = d.run(stmt);
if(cpr.getResponseCode() != 0) {
throw new RuntimeException(stmt + " failed: " + cpr);
}
List<String> rs = new ArrayList<String>();
d.getResults(rs);
return rs;
}
private CommandProcessorResponse runStatementOnDriverNegative(String stmt) throws Exception {
CommandProcessorResponse cpr = d.run(stmt);
if(cpr.getResponseCode() != 0) {
return cpr;
}
throw new RuntimeException("Didn't get expected failure!");
}
// @Ignore
@Test
public void exchangePartition() throws Exception {
runStatementOnDriver("create database ex1");
runStatementOnDriver("create database ex2");
runStatementOnDriver("CREATE TABLE ex1.exchange_part_test1 (f1 string) PARTITIONED BY (ds STRING)");
runStatementOnDriver("CREATE TABLE ex2.exchange_part_test2 (f1 string) PARTITIONED BY (ds STRING)");
runStatementOnDriver("ALTER TABLE ex2.exchange_part_test2 ADD PARTITION (ds='2013-04-05')");
runStatementOnDriver("ALTER TABLE ex1.exchange_part_test1 EXCHANGE PARTITION (ds='2013-04-05') WITH TABLE ex2.exchange_part_test2");
}
}