blob: 91f8f76e1ff53034724ba94fb1fa4ba992235744 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.parse;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hive.common.repl.ReplConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.exec.repl.ReplAck;
import org.apache.hadoop.hive.ql.parse.repl.load.FailoverMetaData;
import org.apache.hadoop.hive.ql.parse.repl.metric.MetricCollector;
import org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metric;
import static org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector.isMetricsEnabledForTests;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.annotation.Nullable;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_DONT_SET;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET;
import static org.apache.hadoop.hive.common.repl.ReplConst.REPL_ENABLE_BACKGROUND_THREAD;
import static org.apache.hadoop.hive.common.repl.ReplConst.REPL_TARGET_DB_PROPERTY;
import static org.apache.hadoop.hive.common.repl.ReplConst.REPL_FAILOVER_ENDPOINT;
import static org.apache.hadoop.hive.common.repl.ReplConst.REPL_METRICS_FAILBACK_COUNT;
import static org.apache.hadoop.hive.common.repl.ReplConst.REPL_METRICS_FAILOVER_COUNT;
import static org.apache.hadoop.hive.common.repl.ReplConst.REPL_METRICS_LAST_FAILBACK_ENDTIME;
import static org.apache.hadoop.hive.common.repl.ReplConst.REPL_METRICS_LAST_FAILBACK_STARTTIME;
import static org.apache.hadoop.hive.common.repl.ReplConst.REPL_METRICS_LAST_FAILOVER_TYPE;
import static org.apache.hadoop.hive.common.repl.ReplConst.TARGET_OF_REPLICATION;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY;
import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile;
import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile;
import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME;
import static org.apache.hadoop.hive.ql.parse.ReplicationSpec.KEY.CURR_STATE_ID_SOURCE;
import static org.apache.hadoop.hive.ql.parse.ReplicationSpec.KEY.CURR_STATE_ID_TARGET;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestReplicationOptimisedBootstrap extends BaseReplicationScenariosAcidTables {
String extraPrimaryDb;
HiveConf primaryConf;
TxnStore txnHandler;
List<Long> tearDownTxns = new ArrayList<>();
List<Long> tearDownLockIds = new ArrayList<>();
@BeforeClass
public static void classLevelSetup() throws Exception {
HashMap<String, String> overrides = new HashMap<>();
overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
GzipJSONMessageEncoder.class.getCanonicalName());
overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true");
overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, UserGroupInformation.getCurrentUser().getUserName());
overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "true");
overrides.put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
overrides.put("hive.in.repl.test", "true");
internalBeforeClassSetup(overrides, TestReplicationOptimisedBootstrap.class);
}
@Before
public void setup() throws Throwable {
super.setup();
extraPrimaryDb = "extra_" + primaryDbName;
primaryConf = primary.getConf();
txnHandler = TxnUtils.getTxnStore(primary.getConf());
}
@After
public void tearDown() throws Throwable {
if (!tearDownTxns.isEmpty()) {
//Abort the left out transactions which might not be completed due to some test failures.
txnHandler.abortTxns(new AbortTxnsRequest(tearDownTxns));
}
//Release the unreleased locks acquired during tests. Although, we specifically release the locks when not required.
//But there may be case when test failed and locks are left in dangling state.
releaseLocks(txnHandler, tearDownLockIds);
primary.run("drop database if exists " + extraPrimaryDb + " cascade");
super.tearDown();
}
@Test
public void testBuildTableDiffGeneration() throws Throwable {
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
// Create two external & two managed tables and do a bootstrap dump & load.
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create external table t1 (id int)")
.run("insert into table t1 values (1)")
.run("insert into table t1 values (2),(3),(4)")
.run("create external table t2 (place string) partitioned by (country string)")
.run("insert into table t2 partition(country='india') values ('chennai')")
.run("insert into table t2 partition(country='us') values ('new york')")
.run("create table t1_managed (id int) clustered by(id) into 3 buckets stored as orc " +
"tblproperties (\"transactional\"=\"true\")")
.run("insert into table t1_managed values (10)")
.run("insert into table t1_managed values (20),(31),(42)")
.run("create table t2_managed (place string) partitioned by (country string)")
.run("insert into table t2_managed partition(country='india') values ('bangalore')")
.run("insert into table t2_managed partition(country='us') values ('austin')")
.dump(primaryDbName, withClause);
// Do the bootstrap load and check all the external & managed tables are present.
replica.load(replicatedDbName, primaryDbName, withClause)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[]{"t1", "t2", "t1_managed", "t2_managed"})
.verifyReplTargetProperty(replicatedDbName);
// Do an incremental dump & load, Add one table which we can drop & an empty table as well.
tuple = primary.run("use " + primaryDbName)
.run("create table t5_managed (id int)")
.run("insert into table t5_managed values (110)")
.run("insert into table t5_managed values (110)")
.run("create table t6_managed (id int)")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[]{"t1", "t2", "t1_managed", "t2_managed", "t5_managed", "t6_managed"})
.verifyReplTargetProperty(replicatedDbName);
// Do some modifications on other database with similar table names & some modifications on original source
// cluster.
primary.run("create database " + extraPrimaryDb)
.run("use " + extraPrimaryDb)
.run("create external table t1 (id int)")
.run("create table t1_managed (id int)")
.run("use " + primaryDbName)
.run("create external table t4 (id int)")
.run("insert into table t4 values (100)")
.run("insert into table t4 values (201)")
.run("create table t4_managed (id int) clustered by(id) into 3 buckets stored as orc " +
"tblproperties (\"transactional\"=\"true\")")
.run("insert into table t4_managed values (110)")
.run("insert into table t4_managed values (220)")
.run("insert into table t2 partition(country='france') values ('lyon')")
.run("insert into table t2_managed partition(country='france') values ('nice')")
.run("alter table t6_managed add columns (name string)")
.run("drop table t5_managed");
// Prepare for reverse replication.
DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
Path newReplDir = new Path(replica.repldDir + "1");
replicaFs.mkdirs(newReplDir);
withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
// Do a reverse dump
tuple = replica.dump(replicatedDbName, withClause);
// Check the event ack file got created.
assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
// Check in case the dump isn't consumed, and we attempt a dump again, that gets skipped and the dump directory
// doesn't change, without any errors.
Path dumpPath = new Path(tuple.dumpLocation);
ContentSummary beforeContentSummary = replicaFs.getContentSummary(dumpPath.getParent());
WarehouseInstance.Tuple emptyTuple = replica.dump(replicatedDbName, withClause);
assertTrue(emptyTuple.dumpLocation.isEmpty());
assertTrue(emptyTuple.lastReplicationId.isEmpty());
ContentSummary afterContentSummary = replicaFs.getContentSummary(dumpPath.getParent());
assertEquals(beforeContentSummary.getFileAndDirectoryCount(), afterContentSummary.getFileAndDirectoryCount());
// Check the event ack file stays intact, despite having a skipped dump.
assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString(),
replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
// Do a load, this should create a table_diff_complete directory
primary.load(primaryDbName,replicatedDbName, withClause);
// Check the table diff directory exist.
assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
// Check the table diff has all the modified table, including the dropped and empty ones
HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(dumpPath, conf);
assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
.containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", "t5_managed", "t6_managed")));
// Do a load again and see, nothing changes as this load isn't consumed.
beforeContentSummary = replicaFs.getContentSummary(dumpPath.getParent());
primary.load(primaryDbName, replicatedDbName, withClause);
assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
.containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", "t5_managed", "t6_managed")));
afterContentSummary = replicaFs.getContentSummary(dumpPath.getParent());
assertEquals(beforeContentSummary.getFileAndDirectoryCount(), afterContentSummary.getFileAndDirectoryCount());
// Check there are entries in the table files.
assertFalse(getPathsFromTableFile("t4", dumpPath, conf).isEmpty());
assertFalse(getPathsFromTableFile("t2", dumpPath, conf).isEmpty());
assertFalse(getPathsFromTableFile("t4_managed", dumpPath, conf).isEmpty());
assertFalse(getPathsFromTableFile("t2_managed", dumpPath, conf).isEmpty());
// Check the dropped and empty tables.
assertTrue(getPathsFromTableFile("t5_managed", dumpPath, conf).isEmpty());
assertTrue(getPathsFromTableFile("t6_managed", dumpPath, conf).size() == 1);
}
@Test
public void testEmptyDiffForControlFailover() throws Throwable {
// In case of control failover both A & B will be in sync, so the table diff should be created empty, without any
// error.
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
// Do a bootstrap cycle(A->B)
primary.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause);
// Add some tables & do a incremental dump.
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create external table t1 (id int)")
.run("insert into table t1 values (100),(200)")
.run("insert into table t1 values (12),(35),(46)")
.run("create table t1_managed (id int)")
.run("insert into table t1_managed values (120)")
.run("insert into table t1_managed values (10),(321),(423)")
.dump(primaryDbName, withClause);
// Do an incremental load and see all the tables are there.
replica.load(replicatedDbName, primaryDbName, withClause)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
.run("show tables like 't1_managed'")
.verifyResult("t1_managed");
// Trigger reverse cycle. Do dump on target cluster.
DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
Path newReplDir = new Path(replica.repldDir + "rev");
replicaFs.mkdirs(newReplDir);
withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
// Do a reverse dump
tuple = replica.dump(replicatedDbName, withClause);
// Even though no diff, the event ack file should be created.
assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
// Do a reverse load.
primary.load(primaryDbName, replicatedDbName, withClause);
// Check the table diff directory still gets created.
assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
// Check the table diff is empty, since we are in sync, so no tables got modified.
HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new Path(tuple.dumpLocation), conf);
assertEquals("Table diff is not empty, contains :" + tableDiffEntries, 0, tableDiffEntries.size());
}
@Test
public void testFirstIncrementalMandatory() throws Throwable {
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
// Create one external and one managed tables and do a bootstrap dump.
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create external table t1 (id int)")
.run("insert into table t1 values (1)")
.run("insert into table t1 values (2),(3),(4)")
.run("create table t1_managed (id int)")
.run("insert into table t1_managed values (10)")
.run("insert into table t1_managed values (20),(31),(42)")
.dump(primaryDbName, withClause);
// Do a bootstrap load and check both managed and external tables are loaded.
replica.load(replicatedDbName, primaryDbName, withClause)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
.run("show tables like 't1_managed'")
.verifyResult("t1_managed")
.verifyReplTargetProperty(replicatedDbName);
// Trigger reverse dump just after the bootstrap cycle.
DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
Path newReplDir = new Path(replica.repldDir + "1");
replicaFs.mkdirs(newReplDir);
withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
// Do a dump on cluster B, it should throw an exception, since the first incremental isn't done yet.
try {
replica.dump(replicatedDbName, withClause);
} catch (HiveException he) {
assertTrue(he.getMessage()
.contains("Replication dump not allowed for replicated database with first incremental dump pending : " + replicatedDbName));
}
// Do a incremental cycle and check we don't get this exception.
withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
primary.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause);
// Retrigger reverse dump, this time it should be successful and event ack should get created.
withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
tuple = replica.dump(replicatedDbName, withClause);
// Check event ack file should get created.
assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
}
@Test
public void testFailureCasesInTableDiffGeneration() throws Throwable {
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
// Do a bootstrap cycle(A->B)
primary.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause);
// Add some table & do an incremental dump.
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create external table t1 (id int)")
.run("insert into table t1 values (1)")
.run("create table t1_managed (id string)")
.run("insert into table t1_managed values ('A')")
.dump(primaryDbName, withClause);
// Do an incremental load and check the tables are there.
replica.load(replicatedDbName, primaryDbName, withClause)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
.run("show tables like 't1_managed'")
.verifyResult("t1_managed")
.verifyReplTargetProperty(replicatedDbName);
// Do some modifications on the source cluster, so we have some entries in the table diff.
primary.run("use " + primaryDbName)
.run("create table t2_managed (id string)")
.run("insert into table t1_managed values ('S')")
.run("insert into table t2_managed values ('A'),('B'),('C')");
// Do some modifications in another database to have unrelated events as well after the last load, which should
// get filtered.
primary.run("create database " + extraPrimaryDb)
.run("use " + extraPrimaryDb)
.run("create external table t1 (id int)")
.run("insert into table t1 values (15),(1),(96)")
.run("create table t1_managed (id string)")
.run("insert into table t1_managed values ('SA'),('PS')");
// Prepare for reverse replication.
DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
Path newReplDir = new Path(replica.repldDir + "reverse");
replicaFs.mkdirs(newReplDir);
withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
// Trigger dump on target cluster.
replicaFs.setQuota(newReplDir, 1, 10000);
try {
tuple = replica.dump(replicatedDbName, withClause);
fail("Should have failed due to quota violation");
} catch (Exception e) {
// Ignore it is expected due to Quota violation.
}
// Check the event_ack file doesn't exist.
assertFalse("event ack file exists despite quota violation", replicaFs.listFiles(newReplDir, true).hasNext());
// Set the quota to a value that makes sure event ack file gets created and then fails
replicaFs.setQuota(newReplDir, replicaFs.getQuotaUsage(newReplDir).getFileAndDirectoryCount() + 3, QUOTA_RESET);
try {
tuple = replica.dump(replicatedDbName, withClause);
fail("Should have failed due to quota violation");
} catch (Exception e) {
// Ignore it is expected due to Quota violation.
}
// Check the event ack file got created despite exception and failure.
assertEquals("event_ack", replicaFs.listFiles(newReplDir, true).next().getPath().getName());
// Remove quota for a successful dump
replicaFs.setQuota(newReplDir, QUOTA_RESET, QUOTA_RESET);
// Retry Dump
tuple = replica.dump(replicatedDbName, withClause);
// Check event ack file is there.
assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
// Set quota again to restrict creation of table diff in middle during load.
replicaFs.setQuota(newReplDir, replicaFs.getQuotaUsage(newReplDir).getFileAndDirectoryCount() + 2, QUOTA_RESET);
try {
primary.load(primaryDbName, replicatedDbName, withClause);
} catch (Exception e) {
// Ignore, expected due to quota violation.
}
// Check table diff in progress directory gets created.
assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_INPROGRESS_DIRECTORY).toString() + " doesn't exist",
replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_INPROGRESS_DIRECTORY)));
// Check table diff complete directory doesn't gets created.
assertFalse(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " exist",
replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
// Set Quota to a value so that table diff complete gets created and we fail post that.
replicaFs.setQuota(newReplDir, replicaFs.getQuotaUsage(newReplDir).getFileAndDirectoryCount() + 1, QUOTA_RESET);
try {
primary.load(primaryDbName, replicatedDbName, withClause);
fail("Expected failure due to quota violation");
} catch (Exception e) {
// Ignore, expected due to quota violation.
}
// Check table diff complete directory gets created.
assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
// Remove the quota and see everything recovers.
replicaFs.setQuota(newReplDir, QUOTA_RESET, QUOTA_RESET);
primary.load(primaryDbName, replicatedDbName, withClause);
// Check table diff in complete directory gets created.
assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
// Check table diff in progress directory isn't there now.
assertFalse(new Path(tuple.dumpLocation, TABLE_DIFF_INPROGRESS_DIRECTORY).toString() + " exist",
replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_INPROGRESS_DIRECTORY)));
// Check the entries in table diff are correct.
HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new Path(tuple.dumpLocation), conf);
assertTrue("Table Diff Contains " + tableDiffEntries,
tableDiffEntries.containsAll(Arrays.asList("t1_managed", "t2_managed")));
}
@Test
public void testReverseReplicationFailureWhenSourceDbIsDropped() throws Throwable {
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
// Do a bootstrap cycle.
primary.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause);
// Create 1 managed table and do a dump & load.
WarehouseInstance.Tuple tuple =
primary.run("use " + primaryDbName)
.run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
"tblproperties (\"transactional\"=\"true\")")
.run("insert into table t1 values (1)")
.dump(primaryDbName, withClause);
// Do the load and check tables is present.
replica.load(replicatedDbName, primaryDbName, withClause)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
.verifyReplTargetProperty(replicatedDbName);
// suppose source database got dropped before initiating reverse replication( B -> A )
primary.run("alter database " + primaryDbName + " set dbproperties('repl.source.for'='')")
.run("drop database "+ primaryDbName +" cascade");
// Do some modifications on the target cluster. (t1)
replica.run("use " + replicatedDbName)
.run("insert into table t1 values (101)")
.run("insert into table t1 values (210),(321)");
// Prepare for reverse replication.
DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
Path newReplDir = new Path(replica.repldDir + "1");
replicaFs.mkdirs(newReplDir);
withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
// Do a reverse dump, this should create event_ack file
tuple = replica.dump(replicatedDbName, withClause);
// Check the event ack file got created.
assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
// this load should throw exception
List<String> finalWithClause = withClause;
assertThrows("Should fail with db doesn't exist exception", SemanticException.class, () -> {
primary.load(primaryDbName, replicatedDbName, finalWithClause);
});
}
@Test
public void testReverseBootstrap() throws Throwable {
List<String> withClause = setUpFirstIterForOptimisedBootstrap();
// Open 3 txns for Database which is not under replication
int numTxnsForSecDb = 3;
List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, primaryConf);
tearDownTxns.addAll(txnsForSecDb);
Map<String, Long> tablesInSecDb = new HashMap<>();
tablesInSecDb.put("t1", (long) numTxnsForSecDb + 4);
tablesInSecDb.put("t2", (long) numTxnsForSecDb + 4);
List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
tearDownLockIds.addAll(lockIdsForSecDb);
//Open 2 txns for Primary Db
int numTxnsForPrimaryDb = 2;
List<Long> txnsForSourceDb = openTxns(numTxnsForPrimaryDb, txnHandler, primaryConf);
tearDownTxns.addAll(txnsForSourceDb);
// Allocate write ids for both tables of source database.
Map<String, Long> tablesInSourceDb = new HashMap<>();
tablesInSourceDb.put("t1", (long) numTxnsForPrimaryDb + 6);
tablesInSourceDb.put("t2", (long) numTxnsForPrimaryDb);
List<Long> lockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(replicatedDbName, tablesInSourceDb, txnHandler,
txnsForSourceDb, replica.getConf());
tearDownLockIds.addAll(lockIdsForSourceDb);
//Open 1 txn with no hive locks acquired
List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
tearDownTxns.addAll(txnsWithNoLocks);
// Do a reverse second dump, this should do a bootstrap dump for the tables in the table_diff and incremental for
// rest.
List<Long> allReplCreatedTxnsOnSource = getReplCreatedTxns();
tearDownTxns.addAll(allReplCreatedTxnsOnSource);
assertTrue("value1".equals(primary.getDatabase(primaryDbName).getParameters().get("key1")));
WarehouseInstance.Tuple tuple = replica.dump(replicatedDbName, withClause);
verifyAllOpenTxnsAborted(allReplCreatedTxnsOnSource, primaryConf);
//Verify that openTxns for sourceDb were aborted before proceeding with bootstrap dump.
verifyAllOpenTxnsAborted(txnsForSourceDb, primaryConf);
verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
verifyAllOpenTxnsNotAborted(txnsWithNoLocks, primaryConf);
txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
txnHandler.abortTxns(new AbortTxnsRequest(txnsWithNoLocks));
releaseLocks(txnHandler, lockIdsForSecDb);
releaseLocks(txnHandler, lockIdsForSecDb);
String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
// _bootstrap directory should be created as bootstrap enabled on external tables.
Path dumpPath1 = new Path(hiveDumpDir, INC_BOOTSTRAP_ROOT_DIR_NAME +"/" + EximUtil.METADATA_PATH_NAME +"/" + replicatedDbName);
FileStatus[] listStatus = dumpPath1.getFileSystem(conf).listStatus(dumpPath1);
ArrayList<String> tablesBootstrapped = new ArrayList<String>();
for (FileStatus file : listStatus) {
tablesBootstrapped.add(file.getPath().getName());
}
assertTrue(tablesBootstrapped.containsAll(Arrays.asList("t1", "t2", "t3")));
// Get source or replica database properties and verify replication metrics properties
Map<String, String> sourceParams = replica.getDatabase(replicatedDbName).getParameters();
verifyReplicationMetricsStatistics(sourceParams, 1, 1, ReplConst.FailoverType.UNPLANNED.toString());
// Do a reverse load, this should do a bootstrap load for the tables in table_diff and incremental for the rest.
primary.load(primaryDbName, replicatedDbName, withClause);
assertFalse("value1".equals(primary.getDatabase(primaryDbName).getParameters().get("key1")));
primary.run("use " + primaryDbName)
.run("select id from t1")
.verifyResults(new String[] { "1", "2", "3", "4", "101", "210", "321" })
.run("select place from t2 where country = 'india'")
.verifyResults(new String[] { "delhi", "chennai" })
.run("select place from t2 where country = 'france'")
.verifyFailure(new String[] { "lyon" })
.run("select id from t3")
.verifyResults(new String[] { "10", "20", "31", "42", "11" })
.run("select place from t4 where country = 'india'")
.verifyResults(new String[] { "bangalore", "lucknow" })
.run("select place from t5 where country = 'china'")
.verifyResults(new String[] { "beejing" });
// Get target or primary database properties and verify replication metrics properties
Map<String, String> targetParams = primary.getDatabase(primaryDbName).getParameters();
verifyReplicationMetricsStatistics(targetParams, 1, 1, ReplConst.FailoverType.UNPLANNED.toString());
// Check the properties on the new target database.
assertTrue(targetParams.containsKey(TARGET_OF_REPLICATION));
assertFalse(targetParams.containsKey(SOURCE_OF_REPLICATION));
// Check the properties on the new source database.
assertFalse(sourceParams.containsKey(TARGET_OF_REPLICATION));
assertFalse(sourceParams.containsKey(CURR_STATE_ID_TARGET.toString()));
assertFalse(sourceParams.containsKey(CURR_STATE_ID_SOURCE.toString()));
assertFalse(sourceParams.containsKey(REPL_TARGET_DB_PROPERTY));
assertTrue(sourceParams.containsKey(SOURCE_OF_REPLICATION));
assertFalse(sourceParams.containsKey(ReplConst.REPL_ENABLE_BACKGROUND_THREAD));
// Proceed with normal incremental flow, post optimised bootstrap is over.
replica.run("use " + replicatedDbName)
.run("insert into table t1 values (98)")
.run("insert into table t2 partition(country='england') values ('london')")
.run("insert into table t2 partition(country='india') values ('jaipur')")
.run("insert into table t3 values (15),(16)")
.run("drop table t4")
.run("insert into table t5 partition(country='china') values ('chengdu')")
.dump(replicatedDbName, withClause);
// Do load and check if the data gets loaded.
primary.load(primaryDbName, replicatedDbName, withClause)
.run("select id from t1")
.verifyResults(new String[] { "1", "2", "3", "4", "101", "210", "321", "98" })
.run("select place from t2 where country = 'england'")
.verifyResults(new String[] { "london" })
.run("select place from t2 where country = 'india'")
.verifyResults(new String[] { "delhi", "chennai", "jaipur" })
.run("select id from t3")
.verifyResults(new String[] { "10", "20", "31", "42", "11", "15", "16" })
.run("show tables like 't4'")
.verifyFailure(new String[]{"t4"})
.run("select place from t5 where country = 'china'")
.verifyResults(new String[] { "beejing", "chengdu" });
}
@Test
public void testReverseBootstrapWithFailedIncremental() throws Throwable {
List<String> withClause = setUpFirstIterForOptimisedBootstrap();
WarehouseInstance.Tuple tuple;
// Do a reverse second dump, this should do a bootstrap dump for the tables in the table_diff and incremental for
// rest.
Path replDir = new Path(replica.repldDir + "1");
DistributedFileSystem dfs = (DistributedFileSystem) replDir.getFileSystem(replica.getConf());
QuotaUsage quotaUsage = dfs.getQuotaUsage(replDir);
dfs.setQuota(replDir, quotaUsage.getFileAndDirectoryCount() + 4, QUOTA_DONT_SET);
try {
tuple = replica.dump(replicatedDbName, withClause);
fail("Expected the dump to fail due to quota violation in middle");
} catch (Exception e) {
// expected
}
// Remove the quota & retry the dump, this time it should be successful
dfs.setQuota(replDir, QUOTA_RESET, QUOTA_RESET);
tuple = replica.dump(replicatedDbName, withClause);
String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
// _bootstrap directory should be created as bootstrap enabled on external tables.
Path dumpPath1 = new Path(hiveDumpDir, INC_BOOTSTRAP_ROOT_DIR_NAME +"/metadata/" + replicatedDbName);
FileStatus[] listStatus = dumpPath1.getFileSystem(conf).listStatus(dumpPath1);
ArrayList<String> tablesBootstrapped = new ArrayList<String>();
for (FileStatus file : listStatus) {
tablesBootstrapped.add(file.getPath().getName());
}
assertTrue(tablesBootstrapped.containsAll(Arrays.asList("t1", "t2", "t3")));
// Do a reverse load, this should do a bootstrap load for the tables in table_diff and incremental for the rest.
dfs.setQuota(replDir, quotaUsage.getFileAndDirectoryCount() + 1, QUOTA_DONT_SET);
try {
primary.load(primaryDbName, replicatedDbName, withClause);
fail("Expected incremental load to fail due to quota violation");
} catch (Exception e) {
// expected
}
// Reset the quota & retry load
dfs.setQuota(replDir, QUOTA_RESET, QUOTA_RESET);
primary.load(primaryDbName, replicatedDbName, withClause);
// Check the data is correct
primary.run("use " + primaryDbName)
.run("select id from t1")
.verifyResults(new String[] { "1", "2", "3", "4", "101", "210", "321" })
.run("select place from t2 where country = 'india'")
.verifyResults(new String[] { "delhi", "chennai" })
.run("select place from t2 where country = 'france'")
.verifyFailure(new String[] { "lyon" })
.run("select id from t3")
.verifyResults(new String[] { "10", "20", "31", "42", "11" })
.run("select place from t4 where country = 'india'")
.verifyResults(new String[] { "bangalore", "lucknow" })
.run("select place from t5 where country = 'china'")
.verifyResults(new String[] { "beejing" });
// Check for correct db Properties set.
Map<String, String> targetParams = primary.getDatabase(primaryDbName).getParameters();
Map<String, String> sourceParams = replica.getDatabase(replicatedDbName).getParameters();
// Check the properties on the new target database.
assertTrue(targetParams.containsKey(TARGET_OF_REPLICATION));
assertTrue(targetParams.containsKey(CURR_STATE_ID_TARGET.toString()));
assertTrue(targetParams.containsKey(CURR_STATE_ID_SOURCE.toString()));
assertFalse(targetParams.containsKey(SOURCE_OF_REPLICATION));
// Check the properties on the new source database.
assertFalse(sourceParams.containsKey(TARGET_OF_REPLICATION));
assertFalse(sourceParams.containsKey(CURR_STATE_ID_TARGET.toString()));
assertFalse(sourceParams.containsKey(CURR_STATE_ID_SOURCE.toString()));
assertFalse(sourceParams.containsKey(REPL_TARGET_DB_PROPERTY));
assertTrue(sourceParams.containsKey(SOURCE_OF_REPLICATION));
// Proceed with normal incremental flow, post optimised bootstrap is over.
replica.run("use " + replicatedDbName)
.run("insert into table t1 values (98)")
.run("insert into table t2 partition(country='england') values ('london')")
.run("insert into table t2 partition(country='india') values ('jaipur')")
.run("insert into table t3 values (15),(16)")
.run("drop table t4")
.run("insert into table t5 partition(country='china') values ('chengdu')")
.dump(replicatedDbName, withClause);
// Do load and check if the data gets loaded.
primary.load(primaryDbName, replicatedDbName, withClause)
.run("select id from t1")
.verifyResults(new String[] { "1", "2", "3", "4", "101", "210", "321", "98" })
.run("select place from t2 where country = 'england'")
.verifyResults(new String[] { "london" })
.run("select place from t2 where country = 'india'")
.verifyResults(new String[] { "delhi", "chennai", "jaipur" })
.run("select id from t3")
.verifyResults(new String[] { "10", "20", "31", "42", "11", "15", "16" })
.run("show tables like 't4'")
.verifyFailure(new String[]{"t4"})
.run("select place from t5 where country = 'china'")
.verifyResults(new String[] { "beejing", "chengdu" });
}
@Test
public void testOverwriteDuringBootstrap() throws Throwable {
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
// Do a bootstrap cycle.
primary.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause);
// Create some partitioned and non partitioned tables and do a dump & load.
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
"tblproperties (\"transactional\"=\"true\")")
.run("insert into table t1 values (1)")
.run("insert into table t1 values (2),(3),(4)")
.run("create table t2 (id int)")
.run("insert into table t2 values (15),(16)")
.run("create table t3 (place string) partitioned by (country string)")
.run("insert into table t3 partition(country='india') values ('chennai')")
.run("insert into table t3 partition(country='us') values ('new york')")
.run("create table t4 (place string) partitioned by (country string)")
.run("insert into table t4 partition(country='china') values ('beejing')")
.run("insert into table t4 partition(country='nepal') values ('kathmandu')")
.dump(primaryDbName, withClause);
// Do the load and check all the external & managed tables are present.
replica.load(replicatedDbName, primaryDbName, withClause)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[]{"t1", "t2", "t3", "t4"})
.verifyReplTargetProperty(replicatedDbName);
// Prepare for reverse bootstrap.
// Do some modifications on original source cluster. The diff becomes(tnew_managed, t1, t2, t3)
// Create one new table: It should get dropped. (tnew_managed)
// Create some new partition: The new partition should get dropped. (t2: france)
// Modify a table, the data should get overwritten. (t1)
// Modify a partition, the data should be overwritten. (t3: india value delhi)
// Drop a table, the table should be recreated(t2)
primary.run("use " + primaryDbName)
.run("create table tnew_managed (id int)")
.run("insert into table t1 values (25)")
.run("insert into table tnew_managed values (110)")
.run("insert into table t3 partition(country='france') values ('lyon')")
.run("insert into table t3 partition(country='india') values ('delhi')")
.run("drop table t2");
// Do some modifications on the target cluster. (t1, t2, t3: bootstrap & t2, t4, t5: incremental)
replica.run("use " + replicatedDbName)
.run("insert into table t1 values (101)")
.run("insert into table t1 values (121),(211)")
.run("insert into table t3 partition(country='india') values ('lucknow')")
.run("insert into table t2 values (11)")
.run("insert into table t4 partition(country='india') values ('kanpur')")
.run("create table t5 (place string) partitioned by (country string)")
.run("insert into table t5 partition(country='china') values ('beejing')")
.run("insert into table t4 partition(country='china') values ('Shanghai')");
// Prepare for reverse replication.
DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
Path newReplDir = new Path(replica.repldDir + "1");
replicaFs.mkdirs(newReplDir);
withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
// Do a reverse dump
tuple = replica.dump(replicatedDbName, withClause);
// Check the event ack file got created.
assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
Path dumpPath = new Path(tuple.dumpLocation);
// Do a load, this should create a table_diff_complete directory
primary.load(primaryDbName, replicatedDbName, withClause);
// Check the table diff directory exist.
assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
// Check the table diff has all the modified table, including the dropped and empty ones
HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(dumpPath, conf);
assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
.containsAll(Arrays.asList("tnew_managed", "t1", "t2", "t3")));
// Do a reverse second dump, this should do a bootstrap dump for the tables in the table_diff and incremental for
// rest.
tuple = replica.dump(replicatedDbName, withClause);
String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
// _bootstrap directory should be created as bootstrap enabled on external tables.
Path dumpPath1 = new Path(hiveDumpDir, INC_BOOTSTRAP_ROOT_DIR_NAME +"/metadata/" + replicatedDbName);
FileStatus[] listStatus =
dumpPath1.getFileSystem(conf).listStatus(dumpPath1);
ArrayList<String> tablesBootstrapped = new ArrayList<String>();
for (FileStatus file : listStatus) {
tablesBootstrapped.add(file.getPath().getName());
}
assertTrue(tablesBootstrapped.containsAll(Arrays.asList("t1", "t2", "t3")));
// Do a reverse load, this should do a bootstrap load for the tables in table_diff and incremental for the rest.
primary.load(primaryDbName, replicatedDbName, withClause);
primary.run("use " + primaryDbName)
.run("select id from t1")
.verifyResults(new String[] { "1", "2", "3", "4", "101", "121", "211" })
.run("select id from t2")
.verifyResults(new String[] { "15", "16", "11" })
.run("select place from t3 where country = 'india'")
.verifyResults(new String[] {"chennai", "lucknow" })
.run("select place from t3 where country = 'us'")
.verifyResults(new String[] {"new york" })
.run("select place from t3 where country = 'france'")
.verifyFailure(new String[] { "lyon" })
.run("select place from t4 where country = 'china'")
.verifyResults(new String[] { "beejing", "Shanghai" })
.run("select place from t4 where country = 'india'")
.verifyResults(new String[] { "kanpur" })
.run("select place from t5 where country = 'china'")
.verifyResults(new String[] { "beejing" })
.run("show tables like 'tnew_managed'")
.verifyFailure(new String[]{"tnew_managed"});
}
@Test
public void testTblMetricRegisterDuringSecondCycleOfOptimizedBootstrap() throws Throwable {
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(false);
withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create table t1_managed (id int) clustered by(id) into 3 buckets stored as orc " +
"tblproperties (\"transactional\"=\"true\")")
.run("insert into table t1_managed values (10)")
.run("insert into table t1_managed values (20),(31),(42)")
.dump(primaryDbName, withClause);
// Do the bootstrap load and check all the external & managed tables are present.
replica.load(replicatedDbName, primaryDbName, withClause)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[]{"t1_managed"})
.verifyReplTargetProperty(replicatedDbName);
// Do an incremental dump & load, Add one table which we can drop & an empty table as well.
tuple = primary.run("use " + primaryDbName)
.run("create table t2_managed (id int) clustered by(id) into 3 buckets stored as orc " +
"tblproperties (\"transactional\"=\"true\")")
.run("insert into table t2_managed values (10)")
.run("insert into table t2_managed values (20),(31),(42)")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[]{"t1_managed", "t2_managed"})
.verifyReplTargetProperty(replicatedDbName);
primary.run("use " + primaryDbName)
.run("insert into table t1_managed values (30)")
.run("insert into table t1_managed values (50),(51),(52)");
// Prepare for reverse replication.
DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
Path newReplDir = new Path(replica.repldDir + "1");
replicaFs.mkdirs(newReplDir);
withClause = ReplicationTestUtils.includeExternalTableClause(false);
withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
// Do a reverse dump
tuple = replica.dump(replicatedDbName, withClause);
// Check the event ack file got created.
assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
// Do a load, this should create a table_diff_complete directory
primary.load(primaryDbName,replicatedDbName, withClause);
// Check the table diff directory exist.
assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
Path dumpPath = new Path(tuple.dumpLocation);
// Check the table diff has all the modified table, including the dropped and empty ones
HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(dumpPath, conf);
assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
.containsAll(Arrays.asList("t1_managed")));
isMetricsEnabledForTests(true);
replica.dump(replicatedDbName, withClause);
MetricCollector collector = MetricCollector.getInstance();
ReplicationMetric metric = collector.getMetrics().getLast();
Stage stage = metric.getProgress().getStageByName("REPL_DUMP");
Metric tableMetric = stage.getMetricByName(ReplUtils.MetricName.TABLES.name());
assertEquals(tableMetric.getTotalCount(), tableDiffEntries.size());
}
@Test
public void testTblMetricRegisterDuringSecondLoadCycleOfOptimizedBootstrap() throws Throwable {
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(false);
withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create table t1_managed (id int) clustered by(id) into 3 buckets stored as orc " +
"tblproperties (\"transactional\"=\"true\")")
.run("insert into table t1_managed values (10)")
.run("insert into table t1_managed values (20),(31),(42)")
.dump(primaryDbName, withClause);
// Do the bootstrap load and check all the external & managed tables are present.
replica.load(replicatedDbName, primaryDbName, withClause)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[]{"t1_managed"})
.verifyReplTargetProperty(replicatedDbName);
// Do an incremental dump & load, Add one table which we can drop & an empty table as well.
tuple = primary.run("use " + primaryDbName)
.run("create table t2_managed (id int) clustered by(id) into 3 buckets stored as orc " +
"tblproperties (\"transactional\"=\"true\")")
.run("insert into table t2_managed values (10)")
.run("insert into table t2_managed values (20),(31),(42)")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[]{"t1_managed", "t2_managed"})
.verifyReplTargetProperty(replicatedDbName);
primary.run("use " + primaryDbName)
.run("insert into table t1_managed values (30)")
.run("insert into table t1_managed values (50),(51),(52)");
// Prepare for reverse replication.
DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
Path newReplDir = new Path(replica.repldDir + "1");
replicaFs.mkdirs(newReplDir);
withClause = ReplicationTestUtils.includeExternalTableClause(false);
withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
// Do a reverse dump
tuple = replica.dump(replicatedDbName, withClause);
// Check the event ack file got created.
assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
// Do a load, this should create a table_diff_complete directory
primary.load(primaryDbName,replicatedDbName, withClause);
// Check the table diff directory exist.
assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
Path dumpPath = new Path(tuple.dumpLocation);
// Check the table diff has all the modified table, including the dropped and empty ones
HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(dumpPath, conf);
assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
.containsAll(Arrays.asList("t1_managed")));
isMetricsEnabledForTests(true);
replica.dump(replicatedDbName, withClause);
//do a load on primary and verify insert queries are discarded
primary.load(primaryDbName,replicatedDbName, withClause)
.run("select id from t1_managed")
.verifyResults(new String[] { "10", "20", "31", "42" });
MetricCollector collector = MetricCollector.getInstance();
ReplicationMetric metric = collector.getMetrics().getLast();
Stage stage = metric.getProgress().getStageByName("REPL_LOAD");
Metric tableMetric = stage.getMetricByName(ReplUtils.MetricName.TABLES.name());
assertEquals(tableMetric.getTotalCount(), tableDiffEntries.size());
}
@NotNull
private List<String> setUpFirstIterForOptimisedBootstrap() throws Throwable {
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
// Do a bootstrap cycle.
primary.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause);
// Open 3 txns for Database which is not under replication
int numTxnsForSecDb = 3;
List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, primaryConf);
tearDownTxns.addAll(txnsForSecDb);
Map<String, Long> tablesInSecDb = new HashMap<>();
tablesInSecDb.put("t1", (long) numTxnsForSecDb);
tablesInSecDb.put("t2", (long) numTxnsForSecDb);
List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
tearDownLockIds.addAll(lockIdsForSecDb);
//Open 2 txns for Primary Db
int numTxnsForPrimaryDb = 2;
List<Long> txnsForSourceDb = openTxns(numTxnsForPrimaryDb, txnHandler, primaryConf);
tearDownTxns.addAll(txnsForSourceDb);
// Allocate write ids for both tables of source database.
Map<String, Long> tablesInSourceDb = new HashMap<>();
tablesInSourceDb.put("t1", (long) numTxnsForPrimaryDb);
tablesInSourceDb.put("t5", (long) numTxnsForPrimaryDb);
List<Long> lockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName, tablesInSourceDb, txnHandler,
txnsForSourceDb, primary.getConf());
tearDownLockIds.addAll(lockIdsForSourceDb);
//Open 1 txn with no hive locks acquired
List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
tearDownTxns.addAll(txnsWithNoLocks);
// Create 4 managed tables and do a dump & load.
WarehouseInstance.Tuple tuple =
primary.run("use " + primaryDbName)
.run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
"tblproperties (\"transactional\"=\"true\")")
.run("insert into table t1 values (1)")
.run("insert into table t1 values (2),(3),(4)")
.run("create table t2 (place string) partitioned by (country string)")
.run("insert into table t2 partition(country='india') values ('chennai')")
.run("insert into table t2 partition(country='us') values ('new york')").run("create table t3 (id int)")
.run("insert into table t3 values (10)").run("insert into table t3 values (20),(31),(42)")
.run("create table t4 (place string) partitioned by (country string)")
.run("insert into table t4 partition(country='india') values ('bangalore')")
.run("insert into table t4 partition(country='us') values ('austin')").dump(primaryDbName, withClause);
// Do the load and check all the external & managed tables are present.
replica.load(replicatedDbName, primaryDbName, withClause).run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName).run("show tables like 't1'")
.verifyResult("t1").run("show tables like 't2'").verifyResult("t2").run("show tables like 't3'")
.verifyResult("t3").run("show tables like 't4'").verifyResult("t4").verifyReplTargetProperty(replicatedDbName);
String forwardReplPolicy = HiveUtils.getReplPolicy(replicatedDbName);
List<Long> targetReplCreatedTxnIds = new ArrayList<>();
for (Long txn: txnsForSecDb) {
targetReplCreatedTxnIds.add(txnHandler.getTargetTxnId(forwardReplPolicy, txn));
}
for (Long txn: txnsForSourceDb) {
targetReplCreatedTxnIds.add(txnHandler.getTargetTxnId(forwardReplPolicy, txn));
}
for (Long txn: txnsWithNoLocks) {
targetReplCreatedTxnIds.add(txnHandler.getTargetTxnId(forwardReplPolicy, txn));
}
verifyAllOpenTxnsNotAborted(targetReplCreatedTxnIds, primaryConf);
//Open New transactions on original source cluster post it went down.
// Open 1 txn for secondary Database
List<Long> newTxnsForSecDb = openTxns(1, txnHandler, primaryConf);
tearDownTxns.addAll(newTxnsForSecDb);
Map<String, Long> newTablesForSecDb = new HashMap<>();
newTablesForSecDb.put("t1", (long) numTxnsForSecDb + 1);
newTablesForSecDb.put("t2", (long) numTxnsForSecDb + 1);
List<Long> newLockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
newTablesForSecDb, txnHandler, newTxnsForSecDb, primaryConf);
tearDownLockIds.addAll(newLockIdsForSecDb);
//Open 1 txn for Primary Db
List<Long> newTxnsForSourceDb = openTxns(1, txnHandler, primaryConf);
tearDownTxns.addAll(newTxnsForSourceDb);
// Allocate write ids for both tables of source database.
Map<String, Long> newTablesInSourceDb = new HashMap<>();
newTablesInSourceDb.put("t1", (long) 5);
newTablesInSourceDb.put("t5", (long) 3);
List<Long> newLockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName, newTablesInSourceDb, txnHandler,
newTxnsForSourceDb, primary.getConf());
tearDownLockIds.addAll(newLockIdsForSourceDb);
//Open 1 txn with no hive locks acquired
List<Long> newTxnsWithNoLock = openTxns(1, txnHandler, primaryConf);
tearDownTxns.addAll(newTxnsWithNoLock);
// Do some modifications on original source cluster. The diff becomes(tnew_managed, t1, t2, t3)
primary.run("use " + primaryDbName).run("create table tnew_managed (id int) clustered by(id) into 3 buckets " +
"stored as orc tblproperties (\"transactional\"=\"true\")")
.run("insert into table t1 values (25)").run("insert into table tnew_managed values (110)")
.run("insert into table t2 partition(country='france') values ('lyon')").run("drop table t3")
.run("alter database "+ primaryDbName + " set DBPROPERTIES ('key1'='value1')");
assertTrue("value1".equals(primary.getDatabase(primaryDbName).getParameters().get("key1")));
// Do some modifications on the target cluster. (t1, t2, t3: bootstrap & t4, t5: incremental)
replica.run("use " + replicatedDbName).run("insert into table t1 values (101)")
.run("insert into table t1 values (210),(321)")
.run("insert into table t2 partition(country='india') values ('delhi')").run("insert into table t3 values (11)")
.run("insert into table t4 partition(country='india') values ('lucknow')")
.run("create table t5 (place string) partitioned by (country string)")
.run("insert into table t5 partition(country='china') values ('beejing')")
.run("alter database "+ replicatedDbName + " set DBPROPERTIES ('" +
ReplConst.REPL_ENABLE_BACKGROUND_THREAD + "'='true')");
assertTrue (MetaStoreUtils.isBackgroundThreadsEnabledForRepl(replica.getDatabase(replicatedDbName)));
// Prepare for reverse replication.
DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
Path newReplDir = new Path(replica.repldDir + "1");
replicaFs.mkdirs(newReplDir);
withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
// Do a reverse dump
tuple = replica.dump(replicatedDbName, withClause);
// Check the event ack file got created.
assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
Path dumpPath = new Path(tuple.dumpLocation);
// Do a load, this should create a table_diff_complete directory
primary.load(primaryDbName, replicatedDbName, withClause);
verifyAllOpenTxnsAborted(txnsForSourceDb, primaryConf);
verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
verifyAllOpenTxnsNotAborted(txnsWithNoLocks, primaryConf);
verifyAllOpenTxnsAborted(newTxnsForSourceDb, primaryConf);
verifyAllOpenTxnsNotAborted(newTxnsForSecDb, primaryConf);
verifyAllOpenTxnsNotAborted(newTxnsWithNoLock, primaryConf);
txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
releaseLocks(txnHandler, lockIdsForSecDb);
txnHandler.abortTxns(new AbortTxnsRequest(txnsWithNoLocks));
txnHandler.abortTxns(new AbortTxnsRequest(newTxnsForSecDb));
releaseLocks(txnHandler, newLockIdsForSecDb);
txnHandler.abortTxns(new AbortTxnsRequest(newTxnsWithNoLock));
// Check the table diff directory exist.
assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
assertTrue(new Path(tuple.dumpLocation, OptimisedBootstrapUtils.ABORT_TXNS_FILE).toString() + " doesn't exist",
replicaFs.exists(new Path(tuple.dumpLocation, OptimisedBootstrapUtils.ABORT_TXNS_FILE)));
List<Long> txnsInAbortTxnFile = OptimisedBootstrapUtils.
getTxnIdFromAbortTxnsFile(new Path(tuple.dumpLocation), primaryConf);
assertTrue (txnsInAbortTxnFile.containsAll(txnsForSourceDb));
assertTrue (txnsInAbortTxnFile.containsAll(txnsForSecDb));
assertTrue (txnsInAbortTxnFile.containsAll(txnsWithNoLocks));
assertEquals (txnsInAbortTxnFile.size(), txnsForSecDb.size() + txnsForSourceDb.size() + txnsWithNoLocks.size());
// Check the table diff has all the modified table, including the dropped and empty ones
HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(dumpPath, conf);
assertTrue("Table Diff Contains " + tableDiffEntries,
tableDiffEntries.containsAll(Arrays.asList("tnew_managed", "t1", "t2", "t3")));
return withClause;
}
List<Long> getReplCreatedTxns() throws MetaException {
List<TxnType> txnListExcludingReplCreated = new ArrayList<>();
for (TxnType type : TxnType.values()) {
// exclude REPL_CREATED txn
if (type != TxnType.REPL_CREATED) {
txnListExcludingReplCreated.add(type);
}
}
return txnHandler.getOpenTxns(txnListExcludingReplCreated).getOpen_txns();
}
@Test
public void testDbParametersAfterOptimizedBootstrap() throws Throwable {
List<String> withClause = Arrays.asList(
String.format("'%s'='%s'", HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "false"),
String.format("'%s'='%s'", HiveConf.ConfVars.HIVE_REPL_FAILOVER_START.varname, "true")
);
// bootstrap
primary.run("use " + primaryDbName)
.run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
"tblproperties (\"transactional\"=\"true\")")
.run("insert into table t1 values (1),(2)")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause);
// incremental
primary.run("use " + primaryDbName)
.run("insert into table t1 values (3)")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause);
// make some changes on primary
primary.run("use " + primaryDbName)
.run("create table t2(name string) stored as orc tblproperties(\"transactional\"=\"true\")")
.run("insert into t2 values('a')");
withClause = Arrays.asList(
String.format("'%s'='%s'", HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "false")
);
// 1st cycle of optimized bootstrap
replica.dump(replicatedDbName, withClause);
primary.load(primaryDbName, replicatedDbName, withClause);
String[] dbParams = new String[]{
TARGET_OF_REPLICATION,
CURR_STATE_ID_SOURCE.toString(),
CURR_STATE_ID_TARGET.toString(),
REPL_TARGET_DB_PROPERTY,
REPL_ENABLE_BACKGROUND_THREAD
};
//verify if all db parameters are set
for (String paramKey : dbParams) {
assertTrue(replica.getDatabase(replicatedDbName).getParameters().containsKey(paramKey));
}
// 2nd cycle of optimized bootstrap
replica.dump(replicatedDbName, withClause);
primary.load(primaryDbName, replicatedDbName, withClause);
for (String paramKey : dbParams) {
assertFalse(replica.getDatabase(replicatedDbName).getParameters().containsKey(paramKey));
}
// ensure optimized bootstrap was successful.
primary.run(String.format("select * from %s.t1", primaryDbName))
.verifyResults(new String[]{"1", "2", "3"})
.run("show tables in "+primaryDbName)
.verifyResults(new String[]{"t1"});
}
@Test
public void testReverseFailoverBeforeOptimizedBootstrap() throws Throwable {
// Do bootstrap dump and load
primary.run("use " + primaryDbName)
.run("create table t1 (id string)")
.run("insert into table t1 values ('A')")
.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName);
// Do incremental dump and load
primary.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName);
//initiate a controlled failover from primary to replica.
List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
primary.dump(primaryDbName, failoverConfigs);
replica.load(replicatedDbName, primaryDbName, failoverConfigs);
//modify primary after failover.
primary.run("use " + primaryDbName)
.run("insert into t1 values('B')");
//initiate a controlled failover from replica to primary before the first cycle of optimized bootstrap is run.
WarehouseInstance.Tuple reverseDump = replica.run("use " + replicatedDbName)
.run("create table t2 (col int)")
.run("insert into t2 values(1),(2)")
.dump(replicatedDbName, failoverConfigs);
// the first reverse dump should NOT be failover ready.
FileSystem fs = new Path(reverseDump.dumpLocation).getFileSystem(conf);
assertTrue(fs.exists(new Path(reverseDump.dumpLocation, EVENT_ACK_FILE)));
Path dumpPath = new Path(reverseDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
assertFalse(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
// ensure load was successful.
primary.load(primaryDbName, replicatedDbName, failoverConfigs);
assertTrue(fs.exists(new Path(reverseDump.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new Path(reverseDump.dumpLocation), conf);
assertTrue(!tableDiffEntries.isEmpty()); // we have modified a table t1 at source
assertTrue(MetaStoreUtils.isDbBeingPlannedFailedOverAtEndpoint(primary.getDatabase(primaryDbName),
MetaStoreUtils.FailoverEndpoint.SOURCE));
assertTrue(MetaStoreUtils.isDbBeingPlannedFailedOverAtEndpoint(replica.getDatabase(replicatedDbName),
MetaStoreUtils.FailoverEndpoint.TARGET));
//do a second dump, this dump should NOT be failover ready as some tables need to be bootstrapped (here it is t1).
reverseDump = replica.dump(replicatedDbName, failoverConfigs);
assertTrue(fs.exists(new Path(reverseDump.dumpLocation, OptimisedBootstrapUtils.BOOTSTRAP_TABLES_LIST)));
dumpPath = new Path(reverseDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
primary.load(primaryDbName, replicatedDbName, failoverConfigs);
//ensure optimized bootstrap was successful
primary.run(String.format("select * from %s.t1", primaryDbName))
.verifyResults(new String[]{"A"})
.run(String.format("select * from %s.t2", primaryDbName))
.verifyResults(new String[]{"1", "2"});
assertFalse(MetaStoreUtils.isDbBeingPlannedFailedOverAtEndpoint(primary.getDatabase(primaryDbName),
MetaStoreUtils.FailoverEndpoint.SOURCE));
assertFalse(MetaStoreUtils.isDbBeingPlannedFailedOverAtEndpoint(replica.getDatabase(replicatedDbName),
MetaStoreUtils.FailoverEndpoint.TARGET));
// Get source and target database properties after optimised bootstrap
Map<String, String> sourceParams = replica.getDatabase(replicatedDbName).getParameters();
Map<String, String> targetParams = primary.getDatabase(primaryDbName).getParameters();
// verify db failback metrics are set properly for source db after optimised bootstrap
verifyReplicationMetricsStatistics(sourceParams, 1, 2, ReplConst.FailoverType.PLANNED.toString());
// verify db failback metrics are set properly for target db after optimised bootstrap
verifyReplicationMetricsStatistics(targetParams, 1, 1, ReplConst.FailoverType.PLANNED.toString());
//do a third dump, this should be failover ready.
reverseDump = replica.dump(replicatedDbName, failoverConfigs);
dumpPath = new Path(reverseDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
primary.load(primaryDbName, replicatedDbName, failoverConfigs);
assertTrue(MetaStoreUtils.isDbBeingPlannedFailedOverAtEndpoint(primary.getDatabase(primaryDbName),
MetaStoreUtils.FailoverEndpoint.TARGET));
assertTrue(MetaStoreUtils.isDbBeingPlannedFailedOverAtEndpoint(replica.getDatabase(replicatedDbName),
MetaStoreUtils.FailoverEndpoint.SOURCE));
sourceParams = replica.getDatabase(replicatedDbName).getParameters();
targetParams = primary.getDatabase(primaryDbName).getParameters();
// verify db failback metrics are set properly for source db after optimised bootstrap
verifyReplicationMetricsStatistics(sourceParams, 1, 3, ReplConst.FailoverType.PLANNED.toString());
// verify db failback metrics are set properly for target db after optimised bootstrap
verifyReplicationMetricsStatistics(targetParams, 1, 2, ReplConst.FailoverType.PLANNED.toString());
//initiate a failover from primary to replica.
WarehouseInstance.Tuple forwardDump = primary.dump(primaryDbName, failoverConfigs);
assertTrue(fs.exists(new Path(forwardDump.dumpLocation, EVENT_ACK_FILE)));
dumpPath = new Path(forwardDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
assertFalse(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
replica.load(replicatedDbName, primaryDbName, failoverConfigs);
assertTrue(fs.exists(new Path(forwardDump.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
tableDiffEntries = getTablesFromTableDiffFile(new Path(forwardDump.dumpLocation), conf);
assertTrue(tableDiffEntries.isEmpty()); // nothing was modified
// here second dump will be failover ready, since no tables need to be bootstrapped.
forwardDump = primary.dump(primaryDbName, failoverConfigs);
assertTrue(fs.exists(new Path(forwardDump.dumpLocation, OptimisedBootstrapUtils.BOOTSTRAP_TABLES_LIST)));
dumpPath = new Path(forwardDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
replica.load(replicatedDbName, primaryDbName, failoverConfigs);
assertTrue(MetaStoreUtils.isDbBeingPlannedFailedOverAtEndpoint(primary.getDatabase(primaryDbName),
MetaStoreUtils.FailoverEndpoint.SOURCE));
assertTrue(MetaStoreUtils.isDbBeingPlannedFailedOverAtEndpoint(replica.getDatabase(replicatedDbName),
MetaStoreUtils.FailoverEndpoint.TARGET));
sourceParams = replica.getDatabase(replicatedDbName).getParameters();
targetParams = primary.getDatabase(primaryDbName).getParameters();
// verify db failback metrics are set properly for source db after optimised bootstrap
verifyReplicationMetricsStatistics(sourceParams, 2, 4, ReplConst.FailoverType.PLANNED.toString());
// verify db failback metrics are set properly for target db after optimised bootstrap
verifyReplicationMetricsStatistics(targetParams, 2, 3, ReplConst.FailoverType.PLANNED.toString());
}
@Test
public void testOptimizedBootstrapWithControlledFailover() throws Throwable {
primary.run("use " + primaryDbName)
.run("create table t1 (id string)")
.run("insert into table t1 values ('A')")
.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName);
primary.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName);
//initiate a controlled failover from primary to replica.
List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
primary.dump(primaryDbName, failoverConfigs);
replica.load(replicatedDbName, primaryDbName, failoverConfigs);
primary.run("use " + primaryDbName)
.run("create table t3 (id int)")
.run("insert into t3 values(1),(2),(3)")
.run("insert into t1 values('B')"); //modify primary after failover.
// initiate first cycle of optimized bootstrap
WarehouseInstance.Tuple reverseDump = replica.run("use " + replicatedDbName)
.run("create table t2 (col int)")
.run("insert into t2 values(1),(2)")
.dump(replicatedDbName);
FileSystem fs = new Path(reverseDump.dumpLocation).getFileSystem(conf);
assertTrue(fs.exists(new Path(reverseDump.dumpLocation, EVENT_ACK_FILE)));
primary.load(primaryDbName, replicatedDbName);
assertEquals(MetaStoreUtils.FailoverEndpoint.SOURCE.toString(),
primary.getDatabase(primaryDbName).getParameters().get(REPL_FAILOVER_ENDPOINT));
assertEquals(MetaStoreUtils.FailoverEndpoint.TARGET.toString(),
replica.getDatabase(replicatedDbName).getParameters().get(REPL_FAILOVER_ENDPOINT));
assertTrue(fs.exists(new Path(reverseDump.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new Path(reverseDump.dumpLocation), conf);
assertTrue(!tableDiffEntries.isEmpty());
assertTrue(MetaStoreUtils.isDbBeingPlannedFailedOverAtEndpoint(primary.getDatabase(primaryDbName),
MetaStoreUtils.FailoverEndpoint.SOURCE));
assertTrue(MetaStoreUtils.isDbBeingPlannedFailedOverAtEndpoint(replica.getDatabase(replicatedDbName),
MetaStoreUtils.FailoverEndpoint.TARGET));
// second cycle of optimized bootstrap
reverseDump = replica.dump(replicatedDbName);
assertTrue(fs.exists(new Path(reverseDump.dumpLocation, OptimisedBootstrapUtils.BOOTSTRAP_TABLES_LIST)));
primary.load(primaryDbName, replicatedDbName);
//ensure optimized bootstrap was successful
primary.run(String.format("select * from %s.t1", primaryDbName))
.verifyResults(new String[]{"A"})
.run(String.format("select * from %s.t2", primaryDbName))
.verifyResults(new String[]{"1", "2"})
.run("show tables in " + primaryDbName)
.verifyResults(new String[]{"t1", "t2"});
assertFalse(primary.getDatabase(primaryDbName).getParameters().containsKey(REPL_FAILOVER_ENDPOINT));
assertFalse(replica.getDatabase(replicatedDbName).getParameters().containsKey(REPL_FAILOVER_ENDPOINT));
}
private void verifyReplicationMetricsStatistics(Map<String, String> dbParams, int expectedFailbackCount, int expectedFailoverCount, String expectedFailoverType) {
// verify failover metrics
assertTrue(dbParams.containsKey(ReplConst.REPL_METRICS_LAST_FAILOVER_TYPE));
String failoverType = dbParams.get(ReplConst.REPL_METRICS_LAST_FAILOVER_TYPE);
assertEquals(failoverType, expectedFailoverType);
assertTrue(dbParams.containsKey(ReplConst.REPL_METRICS_FAILOVER_COUNT));
String failoverCount = dbParams.get(ReplConst.REPL_METRICS_FAILOVER_COUNT);
assertEquals(NumberUtils.toInt(failoverCount, 0), expectedFailoverCount);
// verify failback metrics
assertTrue(dbParams.containsKey(ReplConst.REPL_METRICS_LAST_FAILBACK_STARTTIME));
String failbackStartTime = dbParams.get(ReplConst.REPL_METRICS_LAST_FAILBACK_STARTTIME);
assertNotEquals(NumberUtils.toLong(failbackStartTime, 0), 0);
assertTrue(dbParams.containsKey(ReplConst.REPL_METRICS_FAILBACK_COUNT));
String failbackCount = dbParams.get(ReplConst.REPL_METRICS_FAILBACK_COUNT);
assertEquals(NumberUtils.toInt(failbackCount, 0), expectedFailbackCount);
assertTrue(dbParams.containsKey(ReplConst.REPL_METRICS_LAST_FAILBACK_ENDTIME));
String failbackEndTime = dbParams.get(ReplConst.REPL_METRICS_LAST_FAILBACK_ENDTIME);
assertNotEquals(NumberUtils.toLong(failbackEndTime, 0), 0);
}
}