blob: 7d6fcb10a68cc530873371d7c1ec91c8127e7605 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.parse;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.apache.hadoop.hive.ql.metadata.HiveMetaStoreClientWithLocalCache;
import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
import org.apache.hadoop.hive.shims.Utils;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
import org.apache.hadoop.hive.common.DataCopyStatistics;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.BeforeClass;
import org.junit.AfterClass;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.google.common.collect.Lists;
/**
* TestReplicationScenariosAcidTables - test replication for ACID tables
*/
public class TestReplicationScenariosIncrementalLoadAcidTables {
@Rule
public final TestName testName = new TestName();
protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenariosIncrementalLoadAcidTables.class);
static WarehouseInstance primary;
private static WarehouseInstance replica, replicaNonAcid;
private static HiveConf conf;
private String primaryDbName, replicatedDbName, primaryDbNameExtra;
@BeforeClass
public static void classLevelSetup() throws Exception {
HashMap<String, String> overrides = new HashMap<>();
overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
GzipJSONMessageEncoder.class.getCanonicalName());
internalBeforeClassSetup(overrides, TestReplicationScenariosIncrementalLoadAcidTables.class);
}
static void internalBeforeClassSetup(Map<String, String> overrides, Class clazz)
throws Exception {
conf = new HiveConf(clazz);
conf.set("dfs.client.use.datanode.hostname", "true");
conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
MiniDFSCluster miniDFSCluster =
new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).build();
HashMap<String, String> acidConfs = new HashMap<String, String>() {{
put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
put("hive.support.concurrency", "true");
put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
put("hive.metastore.client.capability.check", "false");
put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
put("hive.strict.checks.bucketing", "false");
put("hive.mapred.mode", "nonstrict");
put("mapred.input.dir.recursive", "true");
put("hive.metastore.disallow.incompatible.col.type.changes", "false");
put("hive.stats.autogather", "false");
}};
acidConfs.putAll(overrides);
primary = new WarehouseInstance(LOG, miniDFSCluster, acidConfs);
acidConfs.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
replica = new WarehouseInstance(LOG, miniDFSCluster, acidConfs);
Map<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{
put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
put("hive.support.concurrency", "false");
put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
put("hive.metastore.client.capability.check", "false");
put("hive.stats.autogather", "false");
}};
overridesForHiveConf1.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1);
}
@AfterClass
public static void classLevelTearDown() throws IOException {
primary.close();
replica.close();
}
@Before
public void setup() throws Throwable {
// set up metastore client cache
if (conf.getBoolVar(HiveConf.ConfVars.MSC_CACHE_ENABLED)) {
HiveMetaStoreClientWithLocalCache.init(conf);
}
primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
replicatedDbName = "replicated_" + primaryDbName;
primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
SOURCE_OF_REPLICATION + "' = '1,2,3')");
primaryDbNameExtra = primaryDbName+"_extra";
primary.run("create database " + primaryDbNameExtra + " WITH DBPROPERTIES ( '" +
SOURCE_OF_REPLICATION + "' = '1,2,3')");
}
@After
public void tearDown() throws Throwable {
primary.run("drop database if exists " + primaryDbName + " cascade");
replica.run("drop database if exists " + replicatedDbName + " cascade");
replicaNonAcid.run("drop database if exists " + replicatedDbName + " cascade");
primary.run("drop database if exists " + primaryDbName + "_extra cascade");
}
@Test
@org.junit.Ignore("HIVE-25491")
public void testAcidTableIncrementalReplication() throws Throwable {
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(bootStrapDump.lastReplicationId);
List<String> selectStmtList = new ArrayList<>();
List<String[]> expectedValues = new ArrayList<>();
String tableName = testName.getMethodName() + "testInsert";
String tableNameMM = tableName + "_MM";
ReplicationTestUtils.appendInsert(primary, primaryDbName, primaryDbNameExtra, tableName,
tableNameMM, selectStmtList, expectedValues);
appendDelete(primary, primaryDbName, primaryDbNameExtra, selectStmtList, expectedValues);
appendUpdate(primary, primaryDbName, primaryDbNameExtra, selectStmtList, expectedValues);
ReplicationTestUtils.appendTruncate(primary, primaryDbName, primaryDbNameExtra,
selectStmtList, expectedValues);
ReplicationTestUtils.appendInsertIntoFromSelect(primary, primaryDbName, primaryDbNameExtra,
tableName, tableNameMM, selectStmtList, expectedValues);
ReplicationTestUtils.appendMerge(primary, primaryDbName, primaryDbNameExtra, selectStmtList, expectedValues);
ReplicationTestUtils.appendCreateAsSelect(primary, primaryDbName, primaryDbNameExtra, tableName,
tableNameMM, selectStmtList, expectedValues);
ReplicationTestUtils.appendImport(primary, primaryDbName, primaryDbNameExtra, tableName,
tableNameMM, selectStmtList, expectedValues);
ReplicationTestUtils.appendInsertOverwrite(primary, primaryDbName, primaryDbNameExtra, tableName,
tableNameMM, selectStmtList, expectedValues);
ReplicationTestUtils.appendLoadLocal(primary, primaryDbName, primaryDbNameExtra, tableName,
tableNameMM, selectStmtList, expectedValues);
ReplicationTestUtils.appendInsertUnion(primary, primaryDbName, primaryDbNameExtra, tableName,
tableNameMM, selectStmtList, expectedValues);
ReplicationTestUtils.appendMultiStatementTxn(primary, primaryDbName, primaryDbNameExtra,
selectStmtList, expectedValues);
appendMultiStatementTxnUpdateDelete(primary, primaryDbName, primaryDbNameExtra, selectStmtList, expectedValues);
ReplicationTestUtils.appendAlterTable(primary, primaryDbName, primaryDbNameExtra, selectStmtList, expectedValues);
verifyIncrementalLoadInt(selectStmtList, expectedValues, bootStrapDump.lastReplicationId);
}
private void appendDelete(WarehouseInstance primary, String primaryDbName, String primaryDbNameExtra,
List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
String tableName = "testDelete";
ReplicationTestUtils.insertRecords(primary, primaryDbName, primaryDbNameExtra,
tableName, null, false, ReplicationTestUtils.OperationType.REPL_TEST_ACID_INSERT);
deleteRecords(tableName);
selectStmtList.add("select count(*) from " + tableName);
expectedValues.add(new String[] {"0"});
}
private void appendUpdate(WarehouseInstance primary, String primaryDbName, String primaryDbNameExtra,
List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
String tableName = "testUpdate";
ReplicationTestUtils.insertRecords(primary, primaryDbName, primaryDbNameExtra,
tableName, null, false, ReplicationTestUtils.OperationType.REPL_TEST_ACID_INSERT);
updateRecords(tableName);
selectStmtList.add("select value from " + tableName + " order by value");
expectedValues.add(new String[] {"1", "100", "100", "100", "100"});
}
private void appendMultiStatementTxnUpdateDelete(WarehouseInstance primary, String primaryDbName, String primaryDbNameExtra,
List<String> selectStmtList, List<String[]> expectedValues)
throws Throwable {
String tableName = "testMultiStatementTxnUpdate";
String tableNameDelete = "testMultiStatementTxnDelete";
String[] resultArray = new String[]{"1", "2", "3", "4", "5"};
String tableProperty = "'transactional'='true'";
String tableStorage = "STORED AS ORC";
ReplicationTestUtils.insertIntoDB(primary, primaryDbName, tableName, tableProperty,
tableStorage, resultArray, true);
updateRecords(tableName);
selectStmtList.add("select value from " + tableName + " order by value");
expectedValues.add(new String[] {"1", "100", "100", "100", "100"});
ReplicationTestUtils.insertIntoDB(primary, primaryDbName, tableNameDelete, tableProperty,
tableStorage, resultArray, true);
deleteRecords(tableNameDelete);
selectStmtList.add("select count(*) from " + tableNameDelete);
expectedValues.add(new String[] {"0"});
}
@Test
public void testReplCM() throws Throwable {
String tableName = "testcm";
String tableNameMM = tableName + "_MM";
String[] result = new String[]{"5"};
WarehouseInstance.Tuple incrementalDump;
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(bootStrapDump.lastReplicationId);
ReplicationTestUtils.insertRecords(primary, primaryDbName, primaryDbNameExtra,
tableName, null, false, ReplicationTestUtils.OperationType.REPL_TEST_ACID_INSERT);
incrementalDump = primary.dump(primaryDbName);
primary.run("drop table " + primaryDbName + "." + tableName);
replica.loadWithoutExplain(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
verifyResultsInReplicaInt(Lists.newArrayList("select count(*) from " + tableName,
"select count(*) from " + tableName + "_nopart"),
Lists.newArrayList(result, result));
ReplicationTestUtils.insertRecords(primary, primaryDbName, primaryDbNameExtra,
tableNameMM, null, true, ReplicationTestUtils.OperationType.REPL_TEST_ACID_INSERT);
incrementalDump = primary.dump(primaryDbName);
primary.run("drop table " + primaryDbName + "." + tableNameMM);
replica.loadWithoutExplain(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
verifyResultsInReplicaInt(Lists.newArrayList("select count(*) from " + tableNameMM,
"select count(*) from " + tableNameMM + "_nopart"),
Lists.newArrayList(result, result));
}
@Test
public void testReplCommitTransactionOnSourceDeleteORC() throws Throwable {
// Run test with ORC format & with transactional true.
testReplCommitTransactionOnSourceDelete("STORED AS ORC", "'transactional'='true'");
}
@Test
public void testReplCommitTransactionOnSourceDeleteText() throws Throwable {
// Run test with TEXT format & with transactional false.
testReplCommitTransactionOnSourceDelete("STORED AS TEXTFILE", "'transactional'='false'");
}
public void testReplCommitTransactionOnSourceDelete(String tableStorage, String tableProperty) throws Throwable {
String tableName = "testReplCommitTransactionOnSourceDelete";
String[] result = new String[] { "5" };
// Do a bootstrap dump.
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName).run("REPL STATUS " + replicatedDbName)
.verifyResult(bootStrapDump.lastReplicationId);
// Add some data to the table & do a incremental dump.
ReplicationTestUtils.insertIntoDB(primary, primaryDbName, tableName, tableProperty, tableStorage,
new String[] { "1", "2", "3", "4", "5" });
WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName);
// Keep a copy of the data, before we drop the table, so that we can copy it back to the location, in order to
// trigger source delete at the time of checksum verification.
Path tablePath = new Path(primary.getTable(primaryDbName, tableName).getSd().getLocation());
Path tablePath_dupe = new Path(primary.getTable(primaryDbName, tableName).getSd().getLocation() + "_dupe");
FileSystem fs = tablePath.getFileSystem(conf);
DataCopyStatistics copyStatistics = new DataCopyStatistics();
FileUtils.copy(fs, tablePath, fs, tablePath_dupe, false, false, conf, copyStatistics);
// Drop the table.
primary.run("drop table " + primaryDbName + "." + tableName);
// Copy back the data to original location, so that copy happens from original location, not the CM location.
copyStatistics = new DataCopyStatistics();
FileUtils.copy(fs, tablePath_dupe, fs, tablePath, false, false, conf, copyStatistics);
// Add a util to delete the original source at the time of source checksum verification.
CopyUtils.testCallable = () -> {
try {
fs.delete(tablePath, true);
} catch (Throwable throwable) {
throwable.printStackTrace();
}
return null;
};
// Do an incremental load & verify if things are good.
replica.loadWithoutExplain(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
verifyResultsInReplicaInt(Lists.newArrayList("select count(*) from " + tableName,
"select count(*) from " + tableName + "_nopart"),
Lists.newArrayList(result, result));
}
private void verifyResultsInReplicaInt(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
ReplicationTestUtils.verifyResultsInReplica(replica, replicatedDbName, selectStmtList, expectedValues);
}
private WarehouseInstance.Tuple verifyIncrementalLoadInt(List<String> selectStmtList,
List<String[]> expectedValues, String lastReplId) throws Throwable {
return ReplicationTestUtils.verifyIncrementalLoad(primary, replica, primaryDbName,
replicatedDbName, selectStmtList, expectedValues, lastReplId);
}
private void deleteRecords(String tableName) throws Throwable {
primary.run("use " + primaryDbName)
.run("delete from " + tableName)
.run("select count(*) from " + tableName)
.verifyResult("0");
}
private void updateRecords(String tableName) throws Throwable {
primary.run("use " + primaryDbName)
.run("update " + tableName + " set value = 100 where key >= 2")
.run("select value from " + tableName + " order by value")
.verifyResults(new String[] {"1", "100", "100", "100", "100"});
}
}