| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hive.ql.parse; |
| |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hdfs.DistributedFileSystem; |
| import org.apache.hadoop.hive.conf.HiveConf; |
| import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; |
| import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; |
| import org.apache.hadoop.hive.metastore.api.Database; |
| import org.apache.hadoop.hive.metastore.api.Table; |
| import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; |
| import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; |
| import org.apache.hadoop.hive.metastore.api.NotificationEvent; |
| import org.apache.hadoop.hive.metastore.conf.MetastoreConf; |
| import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; |
| import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; |
| import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import javax.annotation.Nullable; |
| |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION; |
| import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE; |
| import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getEventIdFromFile; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| /** |
| * Test replication scenarios with staging on replica. |
| */ |
| public class TestReplicationScenariosExclusiveReplica extends BaseReplicationAcrossInstances { |
| |
| String extraPrimaryDb; |
| |
| @BeforeClass |
| public static void classLevelSetup() throws Exception { |
| Map<String, String> overrides = new HashMap<>(); |
| overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), |
| GzipJSONMessageEncoder.class.getCanonicalName()); |
| overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false"); |
| overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true"); |
| overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, |
| UserGroupInformation.getCurrentUser().getUserName()); |
| internalBeforeClassSetupExclusiveReplica(overrides, overrides, TestReplicationScenarios.class); |
| } |
| |
| @Before |
| public void setup() throws Throwable { |
| super.setup(); |
| extraPrimaryDb = "extra_" + primaryDbName; |
| } |
| |
| @After |
| public void tearDown() throws Throwable { |
| super.tearDown(); |
| } |
| |
| @Test |
| public void testTargetEventIdGenerationAfterFirstIncrementalInOptFailover() throws Throwable { |
| List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true); |
| withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'"); |
| |
| // Do a bootstrap cycle(A->B) |
| primary.dump(primaryDbName, withClause); |
| replica.load(replicatedDbName, primaryDbName, withClause); |
| |
| // Add some table & do an incremental dump. |
| WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) |
| .run("create external table table1 (id int)") |
| .run("insert into table table1 values (100)") |
| .run("create table table1_managed (name string)") |
| .run("insert into table table1_managed values ('ABC')") |
| .dump(primaryDbName, withClause); |
| |
| // Do an incremental load |
| replica.load(replicatedDbName, primaryDbName, withClause); |
| |
| // Get the latest notification from the notification log for the target database, just after replication. |
| CurrentNotificationEventId notificationIdAfterRepl = replica.getCurrentNotificationEventId(); |
| |
| // Check the tables are there post incremental load. |
| replica.run("repl status " + replicatedDbName) |
| .verifyResult(tuple.lastReplicationId) |
| .run("use " + replicatedDbName) |
| .run("select id from table1") |
| .verifyResult("100") |
| .run("select name from table1_managed") |
| .verifyResult("ABC") |
| .verifyReplTargetProperty(replicatedDbName); |
| |
| // Do some modifications on the source cluster, so we have some entries in the table diff. |
| primary.run("use " + primaryDbName) |
| .run("create table table2_managed (id string)") |
| .run("insert into table table1_managed values ('SDC')") |
| .run("insert into table table2_managed values ('A'),('B'),('C')"); |
| |
| |
| // Do some modifications in another database to have unrelated events as well after the last load, which should |
| // get filtered. |
| |
| primary.run("create database " + extraPrimaryDb) |
| .run("use " + extraPrimaryDb) |
| .run("create external table t1 (id int)") |
| .run("insert into table t1 values (15),(1),(96)") |
| .run("create table t1_managed (id string)") |
| .run("insert into table t1_managed values ('SA'),('PS')"); |
| |
| // Do some modifications on the target database. |
| replica.run("use " + replicatedDbName) |
| .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('key1'='value1')") |
| .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('key2'='value2')"); |
| |
| // Validate the current replication id on original target has changed now. |
| assertNotEquals(replica.getCurrentNotificationEventId().getEventId(), notificationIdAfterRepl.getEventId()); |
| |
| // Prepare for reverse replication. |
| DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem(); |
| Path newReplDir = new Path(replica.repldDir + "reverse1"); |
| replicaFs.mkdirs(newReplDir); |
| withClause = ReplicationTestUtils.includeExternalTableClause(true); |
| withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'"); |
| |
| tuple = replica.dump(replicatedDbName); |
| |
| // Check event ack file should get created. |
| assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist", |
| replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE))); |
| |
| // Get the target event id. |
| NotificationEventResponse nl = new HiveMetaStoreClient(replica.hiveConf) |
| .getNextNotification(Long.parseLong(getEventIdFromFile(new Path(tuple.dumpLocation), conf)[1]), -1, |
| new DatabaseAndTableFilter(replicatedDbName, null)); |
| |
| // There should be 2 events, two custom alter operations. |
| assertEquals(2, nl.getEvents().size()); |
| } |
| |
| @Test |
| public void testTargetEventIdGenerationInOptmisedFailover() throws Throwable { |
| // Do a a cycle of bootstrap dump & load. |
| List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true); |
| withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'"); |
| |
| // Do a bootstrap cycle(A->B) |
| primary.dump(primaryDbName, withClause); |
| replica.load(replicatedDbName, primaryDbName, withClause); |
| |
| // Add some table & do the first incremental dump. |
| primary.run("use " + primaryDbName) |
| .run("create external table tablei1 (id int)") |
| .run("create external table tablei2 (id int)") |
| .run("create table tablem1 (id int)") |
| .run("create table tablem2 (id int)") |
| .run("insert into table tablei1 values(1),(2),(3),(4)") |
| .run("insert into table tablei2 values(10),(20),(30),(40)") |
| .run("insert into table tablem1 values(5),(10),(15),(20)") |
| .run("insert into table tablem2 values(6),(12),(18),(24)") |
| .dump(primaryDbName, withClause); |
| |
| // Do the incremental load, and check everything is intact. |
| replica.load(replicatedDbName, primaryDbName, withClause) |
| .run("use "+ replicatedDbName) |
| .run("select id from tablei1") |
| .verifyResults(new String[]{"1","2","3","4"}) |
| .run("select id from tablei2") |
| .verifyResults(new String[]{"10","20","30","40"}) |
| .run("select id from tablem1") |
| .verifyResults(new String[]{"5","10","15","20"}) |
| .run("select id from tablem2") |
| .verifyResults(new String[]{"6","12","18","24"}); |
| |
| // Do some modifications & call for the second cycle of incremental dump & load. |
| WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) |
| .run("create external table table1 (id int)") |
| .run("insert into table table1 values (25),(35),(82)") |
| .run("create table table1_managed (name string)") |
| .run("insert into table table1_managed values ('CAD'),('DAS'),('MSA')") |
| .run("insert into table tablei1 values(15),(62),(25),(62)") |
| .run("insert into table tablei2 values(10),(22),(11),(22)") |
| .run("insert into table tablem1 values(5),(10),(15),(20)") |
| .run("alter table table1 set TBLPROPERTIES('comment'='abc')") |
| .dump(primaryDbName, withClause); |
| |
| // Do an incremental load |
| replica.load(replicatedDbName, primaryDbName, withClause); |
| |
| // Get the latest notification from the notification log for the target database, just after replication. |
| CurrentNotificationEventId notificationIdAfterRepl = replica.getCurrentNotificationEventId(); |
| |
| // Check the tables are there post incremental load. |
| replica.run("repl status " + replicatedDbName) |
| .verifyResult(tuple.lastReplicationId) |
| .run("use " + replicatedDbName) |
| .run("select id from table1") |
| .verifyResults(new String[]{"25", "35", "82"}) |
| .run("select name from table1_managed") |
| .verifyResults(new String[]{"CAD", "DAS", "MSA"}) |
| .verifyReplTargetProperty(replicatedDbName); |
| |
| // Do some modifications on the source cluster, so we have some entries in the table diff. |
| primary.run("use " + primaryDbName) |
| .run("create table table2_managed (id string)") |
| .run("insert into table table1_managed values ('AAA'),('BBB')") |
| .run("insert into table table2_managed values ('A1'),('B1'),('C2')"); |
| |
| |
| // Do some modifications in another database to have unrelated events as well after the last load, which should |
| // get filtered. |
| |
| primary.run("create database " + extraPrimaryDb) |
| .run("use " + extraPrimaryDb) |
| .run("create external table table1 (id int)") |
| .run("insert into table table1 values (15),(1),(96)") |
| .run("create table table1_managed (id string)") |
| .run("insert into table table1_managed values ('SAA'),('PSA')"); |
| |
| // Do some modifications on the target database. |
| replica.run("use " + replicatedDbName) |
| .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('repl1'='value1')") |
| .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('repl2'='value2')"); |
| |
| // Validate the current replication id on original target has changed now. |
| assertNotEquals(replica.getCurrentNotificationEventId().getEventId(), notificationIdAfterRepl.getEventId()); |
| |
| // Prepare for reverse replication. |
| DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem(); |
| Path newReplDir = new Path(replica.repldDir + "reverse01"); |
| replicaFs.mkdirs(newReplDir); |
| withClause = ReplicationTestUtils.includeExternalTableClause(true); |
| withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'"); |
| |
| tuple = replica.dump(replicatedDbName, withClause); |
| |
| // Check event ack file should get created. |
| assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist", |
| replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE))); |
| |
| // Get the target event id. |
| NotificationEventResponse nl = new HiveMetaStoreClient(replica.hiveConf) |
| .getNextNotification(Long.parseLong(getEventIdFromFile(new Path(tuple.dumpLocation), conf)[1]), 10, |
| new DatabaseAndTableFilter(replicatedDbName, null)); |
| |
| assertEquals(0, nl.getEventsSize()); |
| } |
| |
| @Test |
| public void testTargetEventIdWithNotificationsExpiredInOptimisedFailover() throws Throwable { |
| // Do a a cycle of bootstrap dump & load. |
| List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true); |
| withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'"); |
| |
| // Do a bootstrap cycle(A->B) |
| primary.dump(primaryDbName, withClause); |
| replica.load(replicatedDbName, primaryDbName, withClause); |
| |
| // Add some table & do the first incremental dump. |
| primary.run("use " + primaryDbName) |
| .run("create external table tablei1 (id int)") |
| .run("create table tablem1 (id int)") |
| .run("insert into table tablei1 values(1),(2),(3),(4)") |
| .run("insert into table tablem1 values(5),(10),(15),(20)") |
| .dump(primaryDbName, withClause); |
| |
| // Do the incremental load, and check everything is intact. |
| replica.load(replicatedDbName, primaryDbName, withClause) |
| .run("use "+ replicatedDbName) |
| .run("select id from tablei1") |
| .verifyResults(new String[]{"1","2","3","4"}) |
| .run("select id from tablem1") |
| .verifyResults(new String[]{"5","10","15","20"}); |
| |
| // Explicitly make the notification logs. |
| // Get the latest notification from the notification log for the target database, just after replication. |
| CurrentNotificationEventId notificationIdAfterRepl = replica.getCurrentNotificationEventId(); |
| // Inject a behaviour where some events missing from notification_log table. |
| // This ensures the incremental dump doesn't get all events for replication. |
| InjectableBehaviourObjectStore.BehaviourInjection<NotificationEventResponse, NotificationEventResponse> |
| eventIdSkipper = |
| new InjectableBehaviourObjectStore.BehaviourInjection<NotificationEventResponse, NotificationEventResponse>() { |
| |
| @Nullable |
| @Override |
| public NotificationEventResponse apply(@Nullable NotificationEventResponse eventIdList) { |
| if (null != eventIdList) { |
| List<NotificationEvent> eventIds = eventIdList.getEvents(); |
| List<NotificationEvent> outEventIds = new ArrayList<>(); |
| for (NotificationEvent event : eventIds) { |
| // Skip the last db event. |
| if (event.getDbName().equalsIgnoreCase(replicatedDbName)) { |
| injectionPathCalled = true; |
| continue; |
| } |
| outEventIds.add(event); |
| } |
| |
| // Return the new list |
| return new NotificationEventResponse(outEventIds); |
| } else { |
| return null; |
| } |
| } |
| }; |
| |
| try { |
| InjectableBehaviourObjectStore.setGetNextNotificationBehaviour(eventIdSkipper); |
| |
| // Prepare for reverse replication. |
| DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem(); |
| Path newReplDir = new Path(replica.repldDir + "reverse01"); |
| replicaFs.mkdirs(newReplDir); |
| withClause = ReplicationTestUtils.includeExternalTableClause(true); |
| withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'"); |
| |
| try { |
| replica.dump(replicatedDbName, withClause); |
| fail("Expected the dump to fail since the notification event is missing."); |
| } catch (Exception e) { |
| // Expected due to missing notification log entry. |
| } |
| |
| // Check if there is a non-recoverable error or not. |
| Path nonRecoverablePath = |
| TestReplicationScenarios.getNonRecoverablePath(newReplDir, replicatedDbName, replica.hiveConf); |
| assertTrue(replicaFs.exists(nonRecoverablePath)); |
| } finally { |
| InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); // reset the behaviour |
| } |
| } |
| |
| @Test |
| public void testDistCpCopyWithRemoteStagingAndCopyTaskOnTarget() throws Throwable { |
| List<String> withClauseOptions = getStagingLocationConfig(replica.repldDir, true); |
| withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='" + false + "'"); |
| WarehouseInstance.Tuple tuple = primary |
| .run("use " + primaryDbName) |
| .run("create external table t1 (id int)") |
| .run("insert into table t1 values (100)") |
| .run("create table t2 (id int)") |
| .run("insert into table t2 values (200)") |
| .dump(primaryDbName, withClauseOptions); |
| |
| // verify that the external table list is written correctly for bootstrap |
| ReplicationTestUtils.assertExternalFileList(Arrays.asList("t1"), tuple.dumpLocation, replica); |
| |
| replica.load(replicatedDbName, primaryDbName, withClauseOptions) |
| .run("use " + replicatedDbName) |
| .run("show tables like 't1'") |
| .verifyResult("t1") |
| .run("select id from t1") |
| .verifyResult("100") |
| .run("show tables like 't2'") |
| .verifyResult("t2") |
| .run("select id from t2") |
| .verifyResult("200"); |
| |
| tuple = primary.run("use " + primaryDbName) |
| .run("create external table t3 (id int)") |
| .run("insert into table t3 values (300)") |
| .run("create table t4 (id int)") |
| .run("insert into table t4 values (400)") |
| .dump(primaryDbName, withClauseOptions); |
| |
| // verify that the external table list is written correctly for incremental |
| ReplicationTestUtils.assertExternalFileList(Arrays.asList("t1", "t3"), tuple.dumpLocation, replica); |
| |
| replica.load(replicatedDbName, primaryDbName, withClauseOptions) |
| .run("use " + replicatedDbName) |
| .run("show tables like 't1'") |
| .verifyResult("t1") |
| .run("show tables like 't2'") |
| .verifyResult("t2") |
| .run("show tables like 't3'") |
| .verifyResult("t3") |
| .run("show tables like 't4'") |
| .verifyResult("t4") |
| .run("select id from t1") |
| .verifyResult("100") |
| .run("select id from t2") |
| .verifyResult("200") |
| .run("select id from t3") |
| .verifyResult("300") |
| .run("select id from t4") |
| .verifyResult("400"); |
| } |
| |
| @Test |
| public void testTableLevelReplicationWithRemoteStaging() throws Throwable { |
| List<String> withClauseOptions = getStagingLocationConfig(replica.repldDir, true); |
| withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='" + false + "'"); |
| WarehouseInstance.Tuple tuple = primary |
| .run("use " + primaryDbName) |
| .run("create external table t1 (id int)") |
| .run("insert into table t1 values (100)") |
| .run("create table t2 (id int)") |
| .run("insert into table t2 values (200)") |
| .dump(primaryDbName +".'t[0-9]+'", withClauseOptions); |
| |
| // verify that the external table list is written correctly for bootstrap |
| ReplicationTestUtils.assertExternalFileList(Arrays.asList("t1"), tuple.dumpLocation, replica); |
| |
| //verify table list |
| verifyTableListForPolicy(replica.miniDFSCluster.getFileSystem(), |
| tuple.dumpLocation, new String[]{"t1", "t2"}); |
| |
| replica.load(replicatedDbName, primaryDbName, withClauseOptions) |
| .run("use " + replicatedDbName) |
| .run("show tables like 't1'") |
| .verifyResult("t1") |
| .run("select id from t1") |
| .verifyResult("100") |
| .run("show tables like 't2'") |
| .verifyResult("t2") |
| .run("select id from t2") |
| .verifyResult("200"); |
| |
| tuple = primary.run("use " + primaryDbName) |
| .run("create external table t3 (id int)") |
| .run("insert into table t3 values (300)") |
| .run("create table t4 (id int)") |
| .run("insert into table t4 values (400)") |
| .run("create table t5 (id int) partitioned by (p int)") |
| .run("insert into t5 partition(p=1) values(10)") |
| .run("insert into t5 partition(p=2) values(20)") |
| .dump(primaryDbName + ".'t[0-9]+'", withClauseOptions); |
| |
| // verify that the external table list is written correctly for incremental |
| ReplicationTestUtils.assertExternalFileList(Arrays.asList("t1", "t3"), tuple.dumpLocation, replica); |
| |
| //verify table list |
| verifyTableListForPolicy(replica.miniDFSCluster.getFileSystem(), |
| tuple.dumpLocation, new String[]{"t1", "t2", "t3", "t4", "t5"}); |
| |
| replica.load(replicatedDbName, primaryDbName, withClauseOptions) |
| .run("use " + replicatedDbName) |
| .run("show tables like 't1'") |
| .verifyResult("t1") |
| .run("show tables like 't2'") |
| .verifyResult("t2") |
| .run("show tables like 't3'") |
| .verifyResult("t3") |
| .run("show tables like 't4'") |
| .verifyResult("t4") |
| .run("show tables like 't5'") |
| .verifyResult("t5") |
| .run("select id from t1") |
| .verifyResult("100") |
| .run("select id from t2") |
| .verifyResult("200") |
| .run("select id from t3") |
| .verifyResult("300") |
| .run("select id from t4") |
| .verifyResult("400") |
| .run("select id from t5") |
| .verifyResults(new String[]{"10", "20"}); |
| } |
| |
| @Test |
| public void testDistCpCopyWithLocalStagingAndCopyTaskOnTarget() throws Throwable { |
| List<String> withClauseOptions = getStagingLocationConfig(primary.repldDir, true); |
| withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='" + false + "'"); |
| WarehouseInstance.Tuple tuple = primary |
| .run("use " + primaryDbName) |
| .run("create external table t1 (id int)") |
| .run("insert into table t1 values (500)") |
| .run("create table t2 (id int)") |
| .run("insert into table t2 values (600)") |
| .dump(primaryDbName, withClauseOptions); |
| |
| // verify that the external table list is written correctly for bootstrap |
| ReplicationTestUtils.assertExternalFileList(Arrays.asList("t1"), tuple.dumpLocation, primary); |
| |
| replica.load(replicatedDbName, primaryDbName, withClauseOptions) |
| .run("use " + replicatedDbName) |
| .run("show tables like 't1'") |
| .verifyResult("t1") |
| .run("select id from t1") |
| .verifyResult("500") |
| .run("show tables like 't2'") |
| .verifyResult("t2") |
| .run("select id from t2") |
| .verifyResult("600"); |
| |
| tuple = primary.run("use " + primaryDbName) |
| .run("create external table t3 (id int)") |
| .run("insert into table t3 values (700)") |
| .run("create table t4 (id int)") |
| .run("insert into table t4 values (800)") |
| .dump(primaryDbName, withClauseOptions); |
| |
| // verify that the external table list is written correctly for incremental |
| ReplicationTestUtils.assertExternalFileList(Arrays.asList("t1", "t3"), tuple.dumpLocation, primary); |
| |
| replica.load(replicatedDbName, primaryDbName, withClauseOptions) |
| .run("use " + replicatedDbName) |
| .run("show tables like 't1'") |
| .verifyResult("t1") |
| .run("show tables like 't2'") |
| .verifyResult("t2") |
| .run("show tables like 't3'") |
| .verifyResult("t3") |
| .run("show tables like 't4'") |
| .verifyResult("t4") |
| .run("select id from t1") |
| .verifyResult("500") |
| .run("select id from t2") |
| .verifyResult("600") |
| .run("select id from t3") |
| .verifyResult("700") |
| .run("select id from t4") |
| .verifyResult("800"); |
| } |
| |
| @Test |
| public void testDistCpCopyWithRemoteStagingAndCopyTaskOnSource() throws Throwable { |
| List<String> withClauseOptions = getStagingLocationConfig(replica.repldDir, true); |
| withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='" + false + "'"); |
| withClauseOptions.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + false + "'"); |
| WarehouseInstance.Tuple tuple = primary |
| .run("use " + primaryDbName) |
| .run("create external table t1 (id int)") |
| .run("insert into table t1 values (100)") |
| .run("create table t2 (id int)") |
| .run("insert into table t2 values (200)") |
| .dump(primaryDbName, withClauseOptions); |
| |
| // verify that the external table list is written correctly for bootstrap |
| ReplicationTestUtils.assertExternalFileList(Arrays.asList("t1"), tuple.dumpLocation, replica); |
| |
| replica.load(replicatedDbName, primaryDbName, withClauseOptions) |
| .run("use " + replicatedDbName) |
| .run("show tables like 't1'") |
| .verifyResult("t1") |
| .run("select id from t1") |
| .verifyResult("100") |
| .run("show tables like 't2'") |
| .verifyResult("t2") |
| .run("select id from t2") |
| .verifyResult("200"); |
| |
| tuple = primary.run("use " + primaryDbName) |
| .run("create external table t3 (id int)") |
| .run("insert into table t3 values (300)") |
| .run("create table t4 (id int)") |
| .run("insert into table t4 values (400)") |
| .dump(primaryDbName, withClauseOptions); |
| |
| // verify that the external table list is written correctly for incremental |
| ReplicationTestUtils.assertExternalFileList(Arrays.asList("t1", "t3"), tuple.dumpLocation, replica); |
| |
| replica.load(replicatedDbName, primaryDbName, withClauseOptions) |
| .run("use " + replicatedDbName) |
| .run("show tables like 't1'") |
| .verifyResult("t1") |
| .run("show tables like 't2'") |
| .verifyResult("t2") |
| .run("show tables like 't3'") |
| .verifyResult("t3") |
| .run("show tables like 't4'") |
| .verifyResult("t4") |
| .run("select id from t1") |
| .verifyResult("100") |
| .run("select id from t2") |
| .verifyResult("200") |
| .run("select id from t3") |
| .verifyResult("300") |
| .run("select id from t4") |
| .verifyResult("400"); |
| } |
| |
| @Test |
| public void testDistCpCopyWithLocalStagingAndCopyTaskOnSource() throws Throwable { |
| List<String> withClauseOptions = getStagingLocationConfig(primary.repldDir, true); |
| withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='" + false + "'"); |
| withClauseOptions.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + false + "'"); |
| WarehouseInstance.Tuple tuple = primary |
| .run("use " + primaryDbName) |
| .run("create external table t1 (id int)") |
| .run("insert into table t1 values (500)") |
| .run("create table t2 (id int)") |
| .run("insert into table t2 values (600)") |
| .dump(primaryDbName, withClauseOptions); |
| |
| // verify that the external table list is written correctly for bootstrap |
| ReplicationTestUtils.assertExternalFileList(Arrays.asList("t1"), tuple.dumpLocation, primary); |
| |
| replica.load(replicatedDbName, primaryDbName, withClauseOptions) |
| .run("use " + replicatedDbName) |
| .run("show tables like 't1'") |
| .verifyResult("t1") |
| .run("select id from t1") |
| .verifyResult("500") |
| .run("show tables like 't2'") |
| .verifyResult("t2") |
| .run("select id from t2") |
| .verifyResult("600"); |
| |
| tuple = primary.run("use " + primaryDbName) |
| .run("create external table t3 (id int)") |
| .run("insert into table t3 values (700)") |
| .run("create table t4 (id int)") |
| .run("insert into table t4 values (800)") |
| .dump(primaryDbName, withClauseOptions); |
| |
| // verify that the external table list is written correctly for incremental |
| ReplicationTestUtils.assertExternalFileList(Arrays.asList("t1", "t3"), tuple.dumpLocation, primary); |
| |
| replica.load(replicatedDbName, primaryDbName, withClauseOptions) |
| .run("use " + replicatedDbName) |
| .run("show tables like 't1'") |
| .verifyResult("t1") |
| .run("show tables like 't2'") |
| .verifyResult("t2") |
| .run("show tables like 't3'") |
| .verifyResult("t3") |
| .run("show tables like 't4'") |
| .verifyResult("t4") |
| .run("select id from t1") |
| .verifyResult("500") |
| .run("select id from t2") |
| .verifyResult("600") |
| .run("select id from t3") |
| .verifyResult("700") |
| .run("select id from t4") |
| .verifyResult("800"); |
| } |
| |
| @Test |
| public void testRegularCopyRemoteStagingAndCopyTaskOnSource() throws Throwable { |
| List<String> withClauseOptions = getStagingLocationConfig(replica.repldDir, false); |
| withClauseOptions.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + false + "'"); |
| WarehouseInstance.Tuple tuple = primary |
| .run("use " + primaryDbName) |
| .run("create external table t1 (id int)") |
| .run("insert into table t1 values (500)") |
| .run("create table t2 (id int)") |
| .run("insert into table t2 values (600)") |
| .dump(primaryDbName, withClauseOptions); |
| |
| // verify that the external table list is written correctly for bootstrap |
| ReplicationTestUtils.assertExternalFileList(Arrays.asList("t1"), tuple.dumpLocation, replica); |
| |
| replica.load(replicatedDbName, primaryDbName, withClauseOptions) |
| .run("use " + replicatedDbName) |
| .run("show tables like 't1'") |
| .verifyResult("t1") |
| .run("select id from t1") |
| .verifyResult("500") |
| .run("show tables like 't2'") |
| .verifyResult("t2") |
| .run("select id from t2") |
| .verifyResult("600"); |
| |
| tuple = primary.run("use " + primaryDbName) |
| .run("create external table t3 (id int)") |
| .run("insert into table t3 values (700)") |
| .run("create table t4 (id int)") |
| .run("insert into table t4 values (800)") |
| .dump(primaryDbName, withClauseOptions); |
| |
| // verify that the external table list is written correctly for incremental |
| ReplicationTestUtils.assertExternalFileList(Arrays.asList("t1", "t3"), tuple.dumpLocation, replica); |
| |
| replica.load(replicatedDbName, primaryDbName, withClauseOptions) |
| .run("use " + replicatedDbName) |
| .run("show tables like 't1'") |
| .verifyResult("t1") |
| .run("show tables like 't2'") |
| .verifyResult("t2") |
| .run("show tables like 't3'") |
| .verifyResult("t3") |
| .run("show tables like 't4'") |
| .verifyResult("t4") |
| .run("select id from t1") |
| .verifyResult("500") |
| .run("select id from t2") |
| .verifyResult("600") |
| .run("select id from t3") |
| .verifyResult("700") |
| .run("select id from t4") |
| .verifyResult("800"); |
| } |
| |
| @Test |
| public void testRegularCopyWithLocalStagingAndCopyTaskOnTarget() throws Throwable { |
| List<String> withClauseOptions = getStagingLocationConfig(primary.repldDir, false); |
| WarehouseInstance.Tuple tuple = primary |
| .run("use " + primaryDbName) |
| .run("create external table t1 (id int)") |
| .run("insert into table t1 values (500)") |
| .run("create table t2 (id int)") |
| .run("insert into table t2 values (600)") |
| .dump(primaryDbName, withClauseOptions); |
| |
| // verify that the external table list is written correctly for bootstrap |
| ReplicationTestUtils.assertExternalFileList(Arrays.asList("t1"), tuple.dumpLocation, primary); |
| |
| replica.load(replicatedDbName, primaryDbName, withClauseOptions) |
| .run("use " + replicatedDbName) |
| .run("show tables like 't1'") |
| .verifyResult("t1") |
| .run("select id from t1") |
| .verifyResult("500") |
| .run("show tables like 't2'") |
| .verifyResult("t2") |
| .run("select id from t2") |
| .verifyResult("600"); |
| |
| tuple = primary.run("use " + primaryDbName) |
| .run("create external table t3 (id int)") |
| .run("insert into table t3 values (700)") |
| .run("create table t4 (id int)") |
| .run("insert into table t4 values (800)") |
| .dump(primaryDbName, withClauseOptions); |
| |
| // verify that the external table list is written correctly for incremental |
| ReplicationTestUtils.assertExternalFileList(Arrays.asList("t1", "t3"), tuple.dumpLocation, primary); |
| |
| replica.load(replicatedDbName, primaryDbName, withClauseOptions) |
| .run("use " + replicatedDbName) |
| .run("show tables like 't1'") |
| .verifyResult("t1") |
| .run("show tables like 't2'") |
| .verifyResult("t2") |
| .run("show tables like 't3'") |
| .verifyResult("t3") |
| .run("show tables like 't4'") |
| .verifyResult("t4") |
| .run("select id from t1") |
| .verifyResult("500") |
| .run("select id from t2") |
| .verifyResult("600") |
| .run("select id from t3") |
| .verifyResult("700") |
| .run("select id from t4") |
| .verifyResult("800"); |
| } |
| |
| @Test |
| public void testReplicationWithSnapshotsWithSourceStaging() throws Throwable { |
| List<String> withClauseOptions = getStagingLocationConfig(primary.repldDir, false); |
| withClauseOptions.add("'" + HiveConf.ConfVars.REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY.varname + "'='" + true + "'"); |
| withClauseOptions.add("'" + HiveConf.ConfVars.REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK.varname + "'='" + true + "'"); |
| WarehouseInstance.Tuple tuple = primary |
| .run("use " + primaryDbName) |
| .run("create external table t1 (id int)") |
| .run("insert into table t1 values (500)") |
| .run("create table t2 (id int)") |
| .run("insert into table t2 values (600)") |
| .dump(primaryDbName, withClauseOptions); |
| |
| replica.load(replicatedDbName, primaryDbName, withClauseOptions) |
| .run("use " + replicatedDbName) |
| .run("show tables like 't1'") |
| .verifyResult("t1") |
| .run("select id from t1") |
| .verifyResult("500") |
| .run("show tables like 't2'") |
| .verifyResult("t2") |
| .run("select id from t2") |
| .verifyResult("600"); |
| |
| tuple = primary.run("use " + primaryDbName) |
| .run("create external table t3 (id int)") |
| .run("insert into table t3 values (700)") |
| .run("create table t4 (id int)") |
| .run("insert into table t4 values (800)") |
| .dump(primaryDbName, withClauseOptions); |
| |
| replica.load(replicatedDbName, primaryDbName, withClauseOptions) |
| .run("use " + replicatedDbName) |
| .run("show tables like 't1'") |
| .verifyResult("t1") |
| .run("show tables like 't2'") |
| .verifyResult("t2") |
| .run("show tables like 't3'") |
| .verifyResult("t3") |
| .run("show tables like 't4'") |
| .verifyResult("t4") |
| .run("select id from t1") |
| .verifyResult("500") |
| .run("select id from t2") |
| .verifyResult("600") |
| .run("select id from t3") |
| .verifyResult("700") |
| .run("select id from t4") |
| .verifyResult("800"); |
| } |
| |
| @Test |
| public void externalTableReplicationDropDatabase() throws Throwable { |
| String primaryDb = "primarydb1"; |
| String replicaDb = "repldb1"; |
| String tableName = "t1"; |
| List<String> withClauseOptions = getStagingLocationConfig(primary.repldDir, false); |
| WarehouseInstance.Tuple tuple = primary |
| .run("create database " + primaryDb) |
| .run("alter database "+ primaryDb + " set dbproperties('repl.source.for'='1,2,3')") |
| .run("use " + primaryDb) |
| .run("create external table " + tableName + " (id int)") |
| .run("insert into table " + tableName + " values (500)") |
| .dump(primaryDb, withClauseOptions); |
| |
| replica.load(replicaDb, primaryDb, withClauseOptions) |
| .run("use " + replicaDb) |
| .run("show tables like '" + tableName + "'") |
| .verifyResult(tableName) |
| .run("select id from " + tableName) |
| .verifyResult("500"); |
| |
| Path dbDataLocPrimary = new Path(primary.externalTableWarehouseRoot, primaryDb + ".db"); |
| Path extTableBase = new Path(replica.getConf().get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname)); |
| Path dbDataLocReplica = new Path(extTableBase + dbDataLocPrimary.toUri().getPath()); |
| verifyTableDataExists(primary, dbDataLocPrimary, tableName, true); |
| verifyTableDataExists(replica, dbDataLocReplica, tableName, true); |
| |
| primary.run("show databases like '" + primaryDb + "'") |
| .verifyResult(primaryDb); |
| replica.run("show databases like '" + replicaDb + "'") |
| .verifyResult(replicaDb); |
| primary.run("drop database " + primaryDb + " cascade"); |
| replica.run("drop database " + replicaDb + " cascade"); |
| primary.run("show databases like '" + primaryDb + "'") |
| .verifyResult(null); |
| replica.run("show databases like '" + replicaDb + "'") |
| .verifyResult(null); |
| |
| verifyTableDataExists(primary, dbDataLocPrimary, tableName, false); |
| verifyTableDataExists(replica, dbDataLocReplica, tableName, true); |
| } |
| |
| @Test |
| public void testCustomWarehouseLocations() throws Throwable { |
| List<String> withClauseOptions = getStagingLocationConfig(primary.repldDir, false); |
| String dbWhManagedLoc = new Path(primary.warehouseRoot.getParent(), "customManagedLoc").toUri().getPath(); |
| String dbWhExternalLoc = new Path(primary.externalTableWarehouseRoot.getParent(), |
| "customExternalLoc").toUri().getPath(); |
| String srcDb = "srcDb"; |
| WarehouseInstance.Tuple tuple = primary |
| .run("create database " + srcDb + " LOCATION '" + dbWhExternalLoc + "' MANAGEDLOCATION '" + dbWhManagedLoc |
| + "' WITH DBPROPERTIES ( '" + SOURCE_OF_REPLICATION + "' = '1,2,3')") |
| .run("use " + srcDb) |
| .run("create table t1 (id int)") |
| .run("insert into table t1 values (500)") |
| .run("create external table t2 (id int)") |
| .run("insert into table t2 values (1000)") |
| .run("create table tp1 (id int) partitioned by (p int)") |
| .run("insert into tp1 partition(p=1) values(10)") |
| .run("insert into tp1 partition(p=2) values(20)") |
| .dump(srcDb, withClauseOptions); |
| |
| replica.load(replicatedDbName, srcDb, withClauseOptions) |
| .run("use " + replicatedDbName) |
| .run("show tables like 't1'") |
| .verifyResult("t1") |
| .run("select id from t1") |
| .verifyResult("500") |
| .run("show tables like 't2'") |
| .verifyResult("t2") |
| .run("select id from t2") |
| .verifyResult("1000") |
| .run("show tables like 'tp1'") |
| .verifyResult("tp1") |
| .run("select id from tp1") |
| .verifyResults(new String[]{"10", "20"}); |
| List<String> listOfTables = new ArrayList<>(); |
| listOfTables.addAll(Arrays.asList("t1", "t2", "tp1")); |
| verifyCustomDBLocations(srcDb, listOfTables, dbWhManagedLoc, dbWhExternalLoc, true); |
| primary.run("use " + srcDb) |
| .run("insert into table t1 values (1000)") |
| .run("insert into table t2 values (2000)") |
| .run("insert into tp1 partition(p=1) values(30)") |
| .run("insert into tp1 partition(p=2) values(40)") |
| .dump(srcDb, withClauseOptions); |
| replica.load(replicatedDbName, srcDb, withClauseOptions) |
| .run("use " + replicatedDbName) |
| .run("show tables like 't1'") |
| .verifyResult("t1") |
| .run("select id from t1") |
| .verifyResults(new String[]{"500", "1000"}) |
| .run("show tables like 't2'") |
| .verifyResult("t2") |
| .run("select id from t2") |
| .verifyResults(new String[]{"1000", "2000"}) |
| .run("show tables like 'tp1'") |
| .verifyResult("tp1") |
| .run("select id from tp1") |
| .verifyResults(new String[]{"10", "20", "30", "40"}); |
| primary.run("use " + srcDb) |
| .run("insert into table t1 values (2000)") |
| .run("insert into table t2 values (3000)") |
| .run("create table t3 (id int)") |
| .run("insert into table t3 values (3000)") |
| .run("create external table t4 (id int)") |
| .run("insert into table t4 values (4000)") |
| .run("insert into tp1 partition(p=1) values(50)") |
| .run("insert into tp1 partition(p=2) values(60)") |
| .run("create table tp2 (id int) partitioned by (p int)") |
| .run("insert into tp2 partition(p=1) values(100)") |
| .run("insert into tp2 partition(p=2) values(200)") |
| .dump(srcDb, withClauseOptions); |
| replica.load(replicatedDbName, srcDb, withClauseOptions) |
| .run("use " + replicatedDbName) |
| .run("show tables like 't1'") |
| .verifyResult("t1") |
| .run("select id from t1") |
| .verifyResults(new String[]{"500", "1000", "2000"}) |
| .run("show tables like 't2'") |
| .verifyResult("t2") |
| .run("select id from t2") |
| .verifyResults(new String[]{"1000", "2000", "3000"}) |
| .run("show tables like 't3'") |
| .verifyResult("t3") |
| .run("select id from t3") |
| .verifyResults(new String[]{"3000"}) |
| .run("show tables like 't4'") |
| .verifyResult("t4") |
| .run("select id from t4") |
| .verifyResults(new String[]{"4000"}) |
| .run("select id from tp1") |
| .verifyResults(new String[]{"10", "20", "30", "40", "50", "60"}) |
| .run("show tables like 'tp1'") |
| .verifyResult("tp1") |
| .run("select id from tp2") |
| .verifyResults(new String[]{"100", "200"}); |
| listOfTables.addAll(Arrays.asList("t3", "t4", "tp2")); |
| verifyCustomDBLocations(srcDb, listOfTables, dbWhManagedLoc, dbWhExternalLoc, true); |
| } |
| |
| @Test |
| public void testCustomWarehouseLocationsConf() throws Throwable { |
| List<String> withClauseOptions = getStagingLocationConfig(primary.repldDir, false); |
| String dbWhManagedLoc = new Path(primary.warehouseRoot.getParent(), "customManagedLoc1").toUri().getPath(); |
| String dbWhExternalLoc = new Path(primary.externalTableWarehouseRoot.getParent(), |
| "customExternalLoc1").toUri().getPath(); |
| String srcDb = "srcDbConf"; |
| WarehouseInstance.Tuple tuple = primary |
| .run("create database " + srcDb + " LOCATION '" + dbWhExternalLoc + "' MANAGEDLOCATION '" + dbWhManagedLoc |
| + "' WITH DBPROPERTIES ( '" + SOURCE_OF_REPLICATION + "' = '1,2,3')") |
| .run("use " + srcDb) |
| .run("create table t1 (id int)") |
| .run("insert into table t1 values (500)") |
| .run("create external table t2 (id int)") |
| .run("insert into table t2 values (1000)") |
| .dump(srcDb, withClauseOptions); |
| |
| withClauseOptions.add("'" + HiveConf.ConfVars.REPL_RETAIN_CUSTOM_LOCATIONS_FOR_DB_ON_TARGET.varname + "'='false'"); |
| replica.load(replicatedDbName, srcDb, withClauseOptions) |
| .run("use " + replicatedDbName) |
| .run("show tables like 't1'") |
| .verifyResult("t1") |
| .run("select id from t1") |
| .verifyResult("500") |
| .run("show tables like 't2'") |
| .verifyResult("t2") |
| .run("select id from t2") |
| .verifyResult("1000"); |
| List<String> listOfTables = new ArrayList<>(); |
| listOfTables.addAll(Arrays.asList("t1", "t2")); |
| verifyCustomDBLocations(srcDb, listOfTables, dbWhManagedLoc, dbWhExternalLoc, false); |
| primary.run("use " + srcDb) |
| .run("insert into table t1 values (1000)") |
| .run("insert into table t2 values (2000)") |
| .dump(srcDb, withClauseOptions); |
| replica.load(replicatedDbName, srcDb, withClauseOptions) |
| .run("use " + replicatedDbName) |
| .run("show tables like 't1'") |
| .verifyResult("t1") |
| .run("select id from t1") |
| .verifyResults(new String[]{"500", "1000"}) |
| .run("show tables like 't2'") |
| .verifyResult("t2") |
| .run("select id from t2") |
| .verifyResults(new String[]{"1000", "2000"}); |
| primary.run("use " + srcDb) |
| .run("insert into table t1 values (2000)") |
| .run("insert into table t2 values (3000)") |
| .run("create table t3 (id int)") |
| .run("insert into table t3 values (3000)") |
| .run("create external table t4 (id int)") |
| .run("insert into table t4 values (4000)") |
| .dump(srcDb, withClauseOptions); |
| replica.load(replicatedDbName, srcDb, withClauseOptions) |
| .run("use " + replicatedDbName) |
| .run("show tables like 't1'") |
| .verifyResult("t1") |
| .run("select id from t1") |
| .verifyResults(new String[]{"500", "1000", "2000"}) |
| .run("show tables like 't2'") |
| .verifyResult("t2") |
| .run("select id from t2") |
| .verifyResults(new String[]{"1000", "2000", "3000"}) |
| .run("show tables like 't3'") |
| .verifyResult("t3") |
| .run("select id from t3") |
| .verifyResults(new String[]{"3000"}) |
| .run("show tables like 't4'") |
| .verifyResult("t4") |
| .run("select id from t4") |
| .verifyResults(new String[]{"4000"}); |
| listOfTables.addAll(Arrays.asList("t3", "t4")); |
| verifyCustomDBLocations(srcDb, listOfTables, dbWhManagedLoc, dbWhExternalLoc, false); |
| } |
| |
| private void verifyCustomDBLocations(String srcDb, List<String> listOfTables, String managedCustLocOnSrc, |
| String externalCustLocOnSrc, boolean replaceCustPath) throws Exception { |
| Database replDatabase = replica.getDatabase(replicatedDbName); |
| String managedCustLocOnTgt = new Path(replDatabase.getManagedLocationUri()).toUri().getPath(); |
| String externalCustLocOnTgt = new Path(replDatabase.getLocationUri()).toUri().getPath(); |
| if (replaceCustPath ) { |
| Assert.assertEquals(managedCustLocOnSrc, managedCustLocOnTgt); |
| Assert.assertNotEquals(managedCustLocOnTgt, replica.warehouseRoot.toUri().getPath()); |
| Assert.assertEquals(externalCustLocOnSrc, externalCustLocOnTgt); |
| Assert.assertNotEquals(externalCustLocOnTgt, new Path(replica.externalTableWarehouseRoot, |
| replicatedDbName.toLowerCase() + ".db").toUri().getPath()); |
| } else { |
| Assert.assertNotEquals(managedCustLocOnSrc, null); |
| Assert.assertEquals(managedCustLocOnTgt, new Path(replica.warehouseRoot, |
| replicatedDbName.toLowerCase() + ".db").toUri().getPath()); |
| Assert.assertNotEquals(externalCustLocOnSrc, externalCustLocOnTgt); |
| Assert.assertEquals(externalCustLocOnTgt, new Path(replica.externalTableWarehouseRoot, |
| replicatedDbName.toLowerCase() + ".db").toUri().getPath()); |
| } |
| verifyTableLocations(srcDb, replDatabase, listOfTables, replaceCustPath); |
| } |
| |
| private void verifyTableLocations(String srcDb, Database replDb, List<String> tables, boolean customLocOntgt) |
| throws Exception { |
| String tgtExtBase = replica.getConf().get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname); |
| for (String tname: tables) { |
| Table table = replica.getTable(replicatedDbName, tname); |
| if ("EXTERNAL_TABLE".equals(table.getTableType())) { |
| String pathOnSrc = new Path(primary.getTable(srcDb, tname).getSd().getLocation()).toUri().getPath(); |
| Assert.assertEquals(new Path(table.getSd().getLocation()), new Path(tgtExtBase, pathOnSrc.substring(1))); |
| } else { |
| //Managed Table case |
| Path tblPathOnTgt = customLocOntgt |
| ? new Path(replDb.getManagedLocationUri(), tname) |
| : new Path(replica.warehouseRoot, replicatedDbName.toLowerCase() + ".db" + "/" + tname ); |
| Assert.assertEquals(new Path(table.getSd().getLocation()), tblPathOnTgt); |
| } |
| } |
| } |
| |
| private void verifyTableDataExists(WarehouseInstance warehouse, Path dbDataPath, String tableName, |
| boolean shouldExists) throws IOException { |
| FileSystem fileSystem = FileSystem.get(warehouse.warehouseRoot.toUri(), warehouse.getConf()); |
| Path tablePath = new Path(dbDataPath, tableName); |
| Path dataFilePath = new Path(tablePath, "000000_0"); |
| Assert.assertEquals(shouldExists, fileSystem.exists(dbDataPath)); |
| Assert.assertEquals(shouldExists, fileSystem.exists(tablePath)); |
| Assert.assertEquals(shouldExists, fileSystem.exists(dataFilePath)); |
| } |
| |
| private List<String> getStagingLocationConfig(String stagingLoc, boolean addDistCpConfigs) throws IOException { |
| List<String> confList = new ArrayList<>(); |
| confList.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + stagingLoc + "'"); |
| if (addDistCpConfigs) { |
| confList.add("'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'"); |
| confList.add("'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'"); |
| confList.add("'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" |
| + UserGroupInformation.getCurrentUser().getUserName() + "'"); |
| } |
| return confList; |
| } |
| |
| /* |
| * Method used from TestTableLevelReplicationScenarios |
| */ |
| private void verifyTableListForPolicy(FileSystem fileSystem, String dumpLocation, String[] tableList) throws Throwable { |
| String hiveDumpLocation = dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; |
| Path tableListFile = new Path(hiveDumpLocation, ReplUtils.REPL_TABLE_LIST_DIR_NAME); |
| tableListFile = new Path(tableListFile, primaryDbName.toLowerCase()); |
| |
| if (tableList == null) { |
| Assert.assertFalse(fileSystem.exists(tableListFile)); |
| return; |
| } else { |
| Assert.assertTrue(fileSystem.exists(tableListFile)); |
| } |
| |
| BufferedReader reader = null; |
| try { |
| InputStream inputStream = fileSystem.open(tableListFile); |
| reader = new BufferedReader(new InputStreamReader(inputStream)); |
| Set tableNames = new HashSet<>(Arrays.asList(tableList)); |
| int numTable = 0; |
| for (String line = reader.readLine(); line != null; line = reader.readLine()) { |
| numTable++; |
| Assert.assertTrue(tableNames.contains(line)); |
| } |
| Assert.assertEquals(numTable, tableList.length); |
| } finally { |
| if (reader != null) { |
| reader.close(); |
| } |
| } |
| } |
| } |