blob: ff7733d2b9b6a7cc76dca92598ef4e32b85092c8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.parse;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.exec.repl.ReplAck;
import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
import org.apache.hadoop.hive.ql.parse.repl.metric.MetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metric;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Progress;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.scheduled.ScheduledQueryExecutionService;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.BeforeClass;
import org.junit.Ignore;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
/**
* TestScheduledReplicationScenarios - test scheduled replication .
*/
public class TestScheduledReplicationScenarios extends BaseReplicationScenariosAcidTables {
private static final long DEFAULT_PROBE_TIMEOUT = 5 * 60 * 1000L; // 5 minutes
@BeforeClass
public static void classLevelSetup() throws Exception {
Map<String, String> overrides = new HashMap<>();
overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
GzipJSONMessageEncoder.class.getCanonicalName());
overrides.put(HiveConf.ConfVars.HIVE_SCHEDULED_QUERIES_EXECUTOR_IDLE_SLEEP_TIME.varname, "1s");
overrides.put(HiveConf.ConfVars.HIVE_SCHEDULED_QUERIES_EXECUTOR_PROGRESS_REPORT_INTERVAL.varname,
"1s");
overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true");
overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
UserGroupInformation.getCurrentUser().getUserName());
internalBeforeClassSetup(overrides, TestScheduledReplicationScenarios.class);
}
static void internalBeforeClassSetup(Map<String, String> overrides,
Class clazz) throws Exception {
conf = new HiveConf(clazz);
conf.set("dfs.client.use.datanode.hostname", "true");
conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
MiniDFSCluster miniDFSCluster =
new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).build();
Map<String, String> acidEnableConf = new HashMap<String, String>() {{
put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
put("hive.support.concurrency", "true");
put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
put("hive.metastore.client.capability.check", "false");
put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
put("hive.strict.checks.bucketing", "false");
put("hive.mapred.mode", "nonstrict");
put("mapred.input.dir.recursive", "true");
put("hive.metastore.disallow.incompatible.col.type.changes", "false");
put("hive.in.repl.test", "true");
}};
acidEnableConf.putAll(overrides);
primary = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
acidEnableConf.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
replica = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
}
@Before
public void setup() throws Throwable {
super.setup();
}
@After
public void tearDown() throws Throwable {
primary.run("drop database if exists " + primaryDbName + " cascade");
replica.run("drop database if exists " + replicatedDbName + " cascade");
primary.run("drop database if exists " + primaryDbName + "_extra cascade");
}
@Test
@Ignore("HIVE-23395")
public void testAcidTablesReplLoadBootstrapIncr() throws Throwable {
// Bootstrap
primary.run("use " + primaryDbName)
.run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
"tblproperties (\"transactional\"=\"true\")")
.run("insert into t1 values(1)")
.run("insert into t1 values(2)");
try (ScheduledQueryExecutionService schqS =
ScheduledQueryExecutionService.startScheduledQueryExecutorService(primary.hiveConf)) {
int next = -1;
ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next), true);
primary.run("create scheduled query s1_t1 every 5 seconds as repl dump " + primaryDbName);
replica.run("create scheduled query s2_t1 every 5 seconds as repl load " + primaryDbName + " INTO "
+ replicatedDbName);
Path dumpRoot = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR),
Base64.getEncoder().encodeToString(primaryDbName.toLowerCase().getBytes(StandardCharsets.UTF_8.name())));
FileSystem fs = FileSystem.get(dumpRoot.toUri(), primary.hiveConf);
next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
Path ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
replica.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
.run("select id from t1 order by id")
.verifyResults(new String[]{"1", "2"});
// First incremental, after bootstrap
primary.run("use " + primaryDbName)
.run("insert into t1 values(3)")
.run("insert into t1 values(4)");
next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
replica.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
.run("select id from t1 order by id")
.verifyResults(new String[]{"1", "2", "3", "4"});
// Second incremental
primary.run("use " + primaryDbName)
.run("insert into t1 values(5)")
.run("insert into t1 values(6)");
next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
replica.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
.run("select id from t1 order by id")
.verifyResults(new String[]{"1", "2", "3", "4", "5", "6"})
.run("drop table t1");
} finally {
primary.run("drop scheduled query s1_t1");
replica.run("drop scheduled query s2_t1");
}
}
@Test
@Ignore("HIVE-23395")
public void testExternalTablesReplLoadBootstrapIncr() throws Throwable {
// Bootstrap
String withClause = " WITH('" + HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA
+ "' = 'true' ,'" + HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA + "' = 'true' , '"
+ HiveConf.ConfVars.HIVE_IN_TEST + "' = 'true'" + ",'"+ HiveConf.ConfVars.REPL_ATLAS_ENDPOINT
+ "' = 'http://localhost:21000/atlas'" + ",'"+ HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB + "' = 'tgt'"
+ ",'"+ HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME + "' = 'cluster0'"
+ ",'"+ HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME + "' = 'cluster1')";
primary.run("use " + primaryDbName)
.run("create external table t2 (id int)")
.run("insert into t2 values(1)")
.run("insert into t2 values(2)");
try (ScheduledQueryExecutionService schqS =
ScheduledQueryExecutionService.startScheduledQueryExecutorService(primary.hiveConf)) {
int next = -1;
ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next), true);
primary.run("create scheduled query s1_t2 every 5 seconds as repl dump " + primaryDbName + withClause);
replica.run("create scheduled query s2_t2 every 5 seconds as repl load " + primaryDbName + " INTO "
+ replicatedDbName + withClause);
Path dumpRoot = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR),
Base64.getEncoder().encodeToString(primaryDbName.toLowerCase().getBytes(StandardCharsets.UTF_8.name())));
FileSystem fs = FileSystem.get(dumpRoot.toUri(), primary.hiveConf);
next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
Path ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
replica.run("use " + replicatedDbName)
.run("show tables like 't2'")
.verifyResult("t2")
.run("select id from t2 order by id")
.verifyResults(new String[]{"1", "2"});
long lastReplId = Long.parseLong(primary.status(replicatedDbName).getOutput().get(0));
DumpMetaData dumpMetaData = new DumpMetaData(ackPath.getParent(), primary.hiveConf);
List<ReplicationMetric> replicationMetrics = MetricCollector.getInstance().getMetrics();
Assert.assertEquals(2, replicationMetrics.size());
//Generate expected metrics
List<ReplicationMetric> expectedReplicationMetrics = new ArrayList<>();
expectedReplicationMetrics.add(generateExpectedMetric("s1_t2", 0, primaryDbName,
Metadata.ReplicationType.BOOTSTRAP, ackPath.getParent().toString(), lastReplId, Status.SUCCESS,
generateDumpStages(true)));
expectedReplicationMetrics.add(generateExpectedMetric("s2_t2",
dumpMetaData.getDumpExecutionId(), replicatedDbName,
Metadata.ReplicationType.BOOTSTRAP, ackPath.getParent().toString(), lastReplId, Status.SUCCESS,
generateLoadStages(true)));
checkMetrics(expectedReplicationMetrics, replicationMetrics);
// First incremental, after bootstrap
primary.run("use " + primaryDbName)
.run("insert into t2 values(3)")
.run("insert into t2 values(4)");
next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
replica.run("use " + replicatedDbName)
.run("show tables like 't2'")
.verifyResult("t2")
.run("select id from t2 order by id")
.verifyResults(new String[]{"1", "2", "3", "4"});
} finally {
primary.run("drop scheduled query s1_t2");
replica.run("drop scheduled query s2_t2");
}
}
@Test
@Ignore("HIVE-25720")
public void testCompleteFailoverWithReverseBootstrap() throws Throwable {
String withClause = "'" + HiveConf.ConfVars.HIVE_IN_TEST + "' = 'true','"
+ HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR + "'='true'" ;
String sourceDbName = "sourceDbName";
String replicaDbName = "replicaDbName";
// Create a table with some data at source DB.
primary.run("create database " + sourceDbName + " with dbproperties('repl.source.for'='a')")
.run("use " + sourceDbName)
.run("create table t2 (id int)").run("insert into t2 values(1)").run("insert into t2 values(2)");
// Schedule Dump & Load and verify the data is replicated properly.
try (ScheduledQueryExecutionService schqS = ScheduledQueryExecutionService
.startScheduledQueryExecutorService(primary.hiveConf)) {
int next = -1;
ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next), true);
primary.run("create scheduled query repl_dump_p1 every 5 seconds as repl dump "
+ sourceDbName + " WITH(" + withClause + ')');
replica.run("create scheduled query repl_load_p1 every 5 seconds as repl load "
+ sourceDbName + " INTO " + replicaDbName + " WITH(" + withClause + ')');
Path dumpRoot = ReplUtils.getEncodedDumpRootPath(primary.hiveConf, sourceDbName.toLowerCase());
FileSystem fs = FileSystem.get(dumpRoot.toUri(), primary.hiveConf);
next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
Path ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
replica.run("use " + replicaDbName).run("show tables like 't2'")
.verifyResult("t2").run("select id from t2 order by id")
.verifyResults(new String[] {"1", "2"});
//Start failover from here.
String startFailoverClause = withClause.concat(",'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
primary.run("alter scheduled query repl_dump_p1 defined as repl dump " + sourceDbName + " WITH(" + startFailoverClause + ')');
replica.run("alter scheduled query repl_load_p1 defined as repl load "
+ sourceDbName + " INTO " + replicaDbName + " WITH(" + startFailoverClause + ')');
next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
ackPath = new Path(dumpRoot,
String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
Path failoverReadyMarker = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ File.separator + ReplAck.FAILOVER_READY_MARKER.toString());
assertTrue(fs.exists(failoverReadyMarker));
assertTrue(MetaStoreUtils.isDbBeingPlannedFailedOverAtEndpoint(primary.getDatabase(sourceDbName),
MetaStoreUtils.FailoverEndpoint.SOURCE));
assertTrue(MetaStoreUtils.isDbBeingPlannedFailedOverAtEndpoint(replica.getDatabase(replicaDbName),
MetaStoreUtils.FailoverEndpoint.TARGET));
primary.run("alter scheduled query repl_dump_p1 disabled")
.run("alter scheduled query repl_dump_p1 defined as repl dump "
+ sourceDbName + " WITH(" + withClause + ')')
.run("alter database " + sourceDbName + " set dbproperties('" + SOURCE_OF_REPLICATION + "'='')")
.run("drop database " + sourceDbName + " cascade");
assertTrue(primary.getDatabase(sourceDbName) == null);
replica.run("alter scheduled query repl_load_p1 disabled")
.run("alter scheduled query repl_load_p1 defined as repl load "
+ sourceDbName + " INTO " + replicaDbName + " WITH(" + withClause + ')')
.run("create scheduled query repl_dump_p2 every 5 seconds as repl dump " + replicaDbName + " WITH(" + withClause + ')');
primary.run("create scheduled query repl_load_p2 every 5 seconds as repl load "
+ replicaDbName + " INTO " + sourceDbName + " WITH(" + withClause + ')');
dumpRoot = ReplUtils.getEncodedDumpRootPath(replica.hiveConf, replicaDbName.toLowerCase());
next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
ackPath = new Path(dumpRoot,
String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
assertFalse(MetaStoreUtils.isTargetOfReplication(replica.getDatabase(replicaDbName)));
Database primaryDb = primary.getDatabase(sourceDbName);
assertFalse(primaryDb == null);
assertTrue(MetaStoreUtils.isTargetOfReplication(primaryDb));
assertFalse(MetaStoreUtils.isDbBeingPlannedFailedOver(primaryDb));
next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
ackPath = new Path(dumpRoot,
String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
assertFalse(ReplUtils.isFirstIncPending(primary.getDatabase(sourceDbName).getParameters()));
assertFalse(MetaStoreUtils.isDbBeingPlannedFailedOver(replica.getDatabase(replicaDbName)));
//Start failback from here.
replica.run("alter scheduled query repl_dump_p2 defined as repl dump " + replicaDbName + " WITH(" + startFailoverClause + ')');
primary.run("alter scheduled query repl_load_p2 defined as repl load "
+ replicaDbName + " INTO " + sourceDbName + " WITH(" + startFailoverClause + ')');
next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
failoverReadyMarker = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ File.separator + ReplAck.FAILOVER_READY_MARKER.toString());
assertTrue(fs.exists(failoverReadyMarker));
assertTrue(MetaStoreUtils.isDbBeingPlannedFailedOverAtEndpoint(replica.getDatabase(replicaDbName),
MetaStoreUtils.FailoverEndpoint.SOURCE));
assertTrue(MetaStoreUtils.isDbBeingPlannedFailedOverAtEndpoint(primary.getDatabase(sourceDbName),
MetaStoreUtils.FailoverEndpoint.TARGET));
replica.run("alter scheduled query repl_dump_p2 disabled")
.run("alter scheduled query repl_dump_p2 defined as repl dump "
+ replicaDbName + " WITH(" + withClause + ')')
.run("alter database " + replicaDbName + " set dbproperties('" + SOURCE_OF_REPLICATION + "'='')")
.run("drop database " + replicaDbName + " cascade")
.run("alter scheduled query repl_load_p1 enabled");
assertTrue(replica.getDatabase(replicaDbName) == null);
primary.run("alter scheduled query repl_load_p2 disabled")
.run("alter scheduled query repl_load_p2 defined as repl load "
+ replicaDbName + " INTO " + sourceDbName + " WITH(" + withClause + ')')
.run("alter scheduled query repl_dump_p1 enabled");
dumpRoot = ReplUtils.getEncodedDumpRootPath(primary.hiveConf, sourceDbName.toLowerCase());
next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
assertFalse(MetaStoreUtils.isTargetOfReplication(primary.getDatabase(sourceDbName)));
Database replicaDb = replica.getDatabase(replicaDbName);
assertFalse(replicaDb == null);
assertTrue(MetaStoreUtils.isTargetOfReplication(replicaDb));
assertFalse(MetaStoreUtils.isDbBeingPlannedFailedOver(replicaDb));
next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
ackPath = new Path(dumpRoot,
String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicaDbName).getParameters()));
assertFalse(MetaStoreUtils.isDbBeingPlannedFailedOver(primary.getDatabase(sourceDbName)));
} finally {
primary.run("drop database if exists " + sourceDbName + " cascade").run("drop scheduled query repl_dump_p1");
replica.run("drop database if exists " + replicaDbName + " cascade").run("drop scheduled query repl_load_p1");
primary.run("drop scheduled query repl_load_p2");
replica.run("drop scheduled query repl_dump_p2");
}
}
@Test
@Ignore("HIVE-25720")
public void testSetPolicyId() throws Throwable {
String withClause =
" WITH('" + HiveConf.ConfVars.HIVE_IN_TEST + "' = 'true'" + ",'"
+ HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME + "' = 'cluster0'"
+ ",'" + HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME
+ "' = 'cluster1')";
// Create a table with some data at source DB.
primary.run("use " + primaryDbName).run("create table t2 (id int)")
.run("insert into t2 values(1)").run("insert into t2 values(2)");
// Remove the SOURCE_OF_REPLICATION property from the database.
primary.run("ALTER DATABASE " + primaryDbName + " Set DBPROPERTIES ( '"
+ SOURCE_OF_REPLICATION + "' = '')");
assertFalse(ReplChangeManager.isSourceOfReplication(primary.getDatabase(primaryDbName)));
// Schedule Dump & Load and verify the data is replicated properly.
try (ScheduledQueryExecutionService schqS = ScheduledQueryExecutionService
.startScheduledQueryExecutorService(primary.hiveConf)) {
int next = -1;
ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next), true);
primary.run("create scheduled query s1_t2 every 5 seconds as repl dump " + primaryDbName + withClause);
replica.run("create scheduled query s2_t2 every 5 seconds as repl load "
+ primaryDbName + " INTO " + replicatedDbName + withClause);
Path dumpRoot = ReplUtils.getEncodedDumpRootPath(primary.hiveConf, primaryDbName.toLowerCase());
FileSystem fs = FileSystem.get(dumpRoot.toUri(), primary.hiveConf);
next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
Path ackPath = new Path(dumpRoot,
String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
replica.run("use " + replicatedDbName).run("show tables like 't2'")
.verifyResult("t2").run("select id from t2 order by id")
.verifyResults(new String[] {"1", "2"});
// Check the database got the SOURCE_OF_REPLICATION property set.
assertTrue(ReplChangeManager.getReplPolicyIdString(primary.getDatabase(primaryDbName)).equals("s1_t2"));
// Remove the SOURCE_OF_REPLICATION property from the database.
primary.run("ALTER DATABASE " + primaryDbName + " Set DBPROPERTIES ( '" + SOURCE_OF_REPLICATION + "' = '')");
assertFalse(ReplChangeManager.isSourceOfReplication(primary.getDatabase(primaryDbName)));
//Test to ensure that repl.source.for is added in incremental iteration of replication also.
GenericTestUtils.waitFor(() -> {
try {
return ReplChangeManager.getReplPolicyIdString(primary.getDatabase(primaryDbName)).equals("s1_t2");
} catch (Throwable e) {
return false;
}
}, 100, 10000);
// Test the new policy id is appended
primary.run("drop scheduled query s1_t2");
fs.delete(dumpRoot, true);
primary.run("create scheduled query s1_t2_new every 5 seconds as repl " + "dump " + primaryDbName + withClause);
GenericTestUtils.waitFor(() -> {
try {
return ReplChangeManager.getReplPolicyIdString(primary.getDatabase(primaryDbName)).equals("s1_t2, s1_t2_new");
} catch (Throwable e) {
return false;
}
}, 100, 10000);
} finally {
primary.run("drop scheduled query s1_t2_new");
replica.run("drop scheduled query s2_t2");
}
}
private void checkMetrics(List<ReplicationMetric> expectedReplicationMetrics,
List<ReplicationMetric> actualMetrics) {
Assert.assertEquals(expectedReplicationMetrics.size(), actualMetrics.size());
int metricCounter = 0;
for (ReplicationMetric actualMetric : actualMetrics) {
for (ReplicationMetric expecMetric : expectedReplicationMetrics) {
if (actualMetric.getPolicy().equalsIgnoreCase(expecMetric.getPolicy())) {
Assert.assertEquals(expecMetric.getDumpExecutionId(), actualMetric.getDumpExecutionId());
Assert.assertEquals(expecMetric.getMetadata().getDbName(), actualMetric.getMetadata().getDbName());
Assert.assertEquals(expecMetric.getMetadata().getLastReplId(),
actualMetric.getMetadata().getLastReplId());
Assert.assertEquals(expecMetric.getMetadata().getStagingDir(),
actualMetric.getMetadata().getStagingDir());
Assert.assertEquals(expecMetric.getMetadata().getReplicationType(),
actualMetric.getMetadata().getReplicationType());
Assert.assertEquals(expecMetric.getProgress().getStatus(), actualMetric.getProgress().getStatus());
Assert.assertEquals(expecMetric.getProgress().getStages().size(),
actualMetric.getProgress().getStages().size());
List<Stage> expectedStages = expecMetric.getProgress().getStages();
List<Stage> actualStages = actualMetric.getProgress().getStages();
int counter = 0;
for (Stage actualStage : actualStages) {
for (Stage expeStage : expectedStages) {
if (actualStage.getName().equalsIgnoreCase(expeStage.getName())) {
Assert.assertEquals(expeStage.getStatus(), actualStage.getStatus());
Assert.assertEquals(expeStage.getMetrics().size(), actualStage.getMetrics().size());
for (Metric actMetric : actualStage.getMetrics()) {
for (Metric expMetric : expeStage.getMetrics()) {
if (actMetric.getName().equalsIgnoreCase(expMetric.getName())) {
Assert.assertEquals(expMetric.getTotalCount(), actMetric.getTotalCount());
Assert.assertEquals(expMetric.getCurrentCount(), actMetric.getCurrentCount());
}
}
}
counter++;
if (counter == actualStages.size()) {
break;
}
}
}
}
metricCounter++;
if (metricCounter == actualMetrics.size()) {
break;
}
}
}
}
}
private List<Stage> generateLoadStages(boolean isBootstrap) {
List<Stage> stages = new ArrayList<>();
//Ranger
Stage rangerDump = new Stage("RANGER_LOAD", Status.SUCCESS, 0);
Metric rangerMetric = new Metric(ReplUtils.MetricName.POLICIES.name(), 0);
rangerDump.addMetric(rangerMetric);
stages.add(rangerDump);
//Atlas
Stage atlasDump = new Stage("ATLAS_LOAD", Status.SUCCESS, 0);
Metric atlasMetric = new Metric(ReplUtils.MetricName.ENTITIES.name(), 0);
atlasDump.addMetric(atlasMetric);
stages.add(atlasDump);
//Hive
Stage replDump = new Stage("REPL_LOAD", Status.SUCCESS, 0);
if (isBootstrap) {
Metric hiveMetric = new Metric(ReplUtils.MetricName.TABLES.name(), 1);
hiveMetric.setCurrentCount(1);
replDump.addMetric(hiveMetric);
hiveMetric = new Metric(ReplUtils.MetricName.FUNCTIONS.name(), 0);
replDump.addMetric(hiveMetric);
} else {
Metric hiveMetric = new Metric(ReplUtils.MetricName.EVENTS.name(), 1);
hiveMetric.setCurrentCount(1);
replDump.addMetric(hiveMetric);
}
stages.add(replDump);
return stages;
}
private List<Stage> generateDumpStages(boolean isBootstrap) {
List<Stage> stages = new ArrayList<>();
//Ranger
Stage rangerDump = new Stage("RANGER_DUMP", Status.SUCCESS, 0);
Metric rangerMetric = new Metric(ReplUtils.MetricName.POLICIES.name(), 0);
rangerDump.addMetric(rangerMetric);
stages.add(rangerDump);
//Atlas
Stage atlasDump = new Stage("ATLAS_DUMP", Status.SUCCESS, 0);
Metric atlasMetric = new Metric(ReplUtils.MetricName.ENTITIES.name(), 0);
atlasDump.addMetric(atlasMetric);
stages.add(atlasDump);
//Hive
Stage replDump = new Stage("REPL_DUMP", Status.SUCCESS, 0);
if (isBootstrap) {
Metric hiveMetric = new Metric(ReplUtils.MetricName.TABLES.name(), 1);
hiveMetric.setCurrentCount(1);
replDump.addMetric(hiveMetric);
hiveMetric = new Metric(ReplUtils.MetricName.FUNCTIONS.name(), 0);
replDump.addMetric(hiveMetric);
} else {
Metric hiveMetric = new Metric(ReplUtils.MetricName.EVENTS.name(), 1);
hiveMetric.setCurrentCount(1);
replDump.addMetric(hiveMetric);
}
stages.add(replDump);
return stages;
}
private ReplicationMetric generateExpectedMetric(String policy, long dumpExecId, String dbName,
Metadata.ReplicationType replicationType, String staging,
long lastReplId, Status status, List<Stage> stages) {
Metadata metadata = new Metadata(dbName, replicationType, staging);
metadata.setLastReplId(lastReplId);
ReplicationMetric replicationMetric = new ReplicationMetric(0, policy, dumpExecId, metadata);
Progress progress = new Progress();
progress.setStatus(status);
for (Stage stage : stages) {
progress.addStage(stage);
}
replicationMetric.setProgress(progress);
return replicationMetric;
}
private void waitForAck(FileSystem fs, Path ackFile, long timeout) throws IOException {
long oldTime = System.currentTimeMillis();
long sleepInterval = 2;
while(true) {
if (fs.exists(ackFile)) {
return;
}
try {
Thread.sleep(sleepInterval);
} catch (InterruptedException e) {
//no-op
}
if (System.currentTimeMillis() - oldTime > timeout) {
break;
}
}
throw new IOException("Timed out waiting for the ack file: " + ackFile.toString());
}
}