blob: cdc87733581fc4a332fe40559f320ca491ff904f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.parse;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils;
import org.apache.hadoop.hive.ql.parse.repl.metric.MetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK_PATHS;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY;
import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.externalTableDataPath;
import static org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils.NEW_SNAPSHOT;
import static org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils.OLD_SNAPSHOT;
import static org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils.deleteSnapshotIfExists;
import static org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils.firstSnapshot;
import static org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils.secondSnapshot;
import static org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata.ReplicationType.BOOTSTRAP;
import static org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata.ReplicationType.INCREMENTAL;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestReplicationScenariosUsingSnapshots extends BaseReplicationAcrossInstances {
String extraPrimaryDb;
@BeforeClass
public static void classLevelSetup() throws Exception {
HashMap<String, String> overrides = new HashMap<>();
overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
GzipJSONMessageEncoder.class.getCanonicalName());
overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true");
overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
UserGroupInformation.getCurrentUser().getUserName());
overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "false");
overrides.put(HiveConf.ConfVars.REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK.varname, "true");
overrides.put(REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY.varname, "true");
internalBeforeClassSetup(overrides, TestReplicationScenarios.class);
}
@Before
public void setup() throws Throwable {
super.setup();
extraPrimaryDb = "extra_" + primaryDbName;
}
@After
public void tearDown() throws Throwable {
primary.run("drop database if exists " + extraPrimaryDb + " cascade");
SnapshottableDirectoryStatus[] snaps =
primary.miniDFSCluster.getFileSystem().getSnapshottableDirListing();
// In case of tests where in the end so directory stays snapshottable the listing will return null, so ignore,
// else clean up all snapshots to allow minidfs to delete the directories and teardown.
if (snaps != null) {
for (SnapshottableDirectoryStatus sn : snaps) {
Path path = sn.getFullPath();
SnapshotUtils.deleteSnapshotSafe(primary.miniDFSCluster.getFileSystem(), path,
firstSnapshot(primaryDbName.toLowerCase()));
SnapshotUtils.deleteSnapshotSafe(primary.miniDFSCluster.getFileSystem(), path,
secondSnapshot(primaryDbName.toLowerCase()));
}
}
primary.miniDFSCluster.getFileSystem().delete(new Path("/"), true);
super.tearDown();
}
@Test
public void testBasicReplicationWithSnapshots() throws Throwable {
// Create a partitioned and non-partitioned table and call dump.
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create external table table1 (id int)")
.run("insert into table table1 values (1)")
.run("insert into table table1 values (2)")
.run("create external table table2 (place string) partitioned by "
+ "(country string)")
.run("insert into table table2 partition(country='india') values "
+ "('bangalore')")
.run("insert into table table2 partition(country='us') values "
+ "('austin')")
.run("insert into table table2 partition(country='france') values "
+ "('paris')")
.dump(primaryDbName);
// Call load, For the first time, only snapshots would be created and distCp would run from source snapshot to
// target.
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 'table1'")
.verifyResult("table1")
.run("show tables like 'table2'")
.verifyResult("table2")
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("select country from table2 where country = 'us'")
.verifyResult("us")
.run("select country from table2 where country = 'france'")
.verifyResult("france")
.run("show partitions table2")
.verifyResults(new String[] {"country"
+ "=france", "country=india", "country=us"})
.run("select * from table1").
verifyResults(new String[]{"1","2"});
// Verify Snapshots are created in source.
validateInitialSnapshotsCreated(primary.getDatabase(primaryDbName).getLocationUri());
String hiveDumpLocation = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
// Ckpt should be set on bootstrapped db.
replica.verifyIfCkptSet(replicatedDbName, hiveDumpLocation);
// Create a new table and do dump, for these also it should do a normal distcp and copy from snapshot directory
tuple = primary.run("use " + primaryDbName)
.run("create external table table3 (id int)")
.run("insert into table table3 values (10)")
.run("create external table table4 as select id from table3")
.dump(primaryDbName);
// Verify that the table info is written correctly for incremental
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 'table3'")
.verifyResult("table3")
.run("select id from table3")
.verifyResult("10")
.run("select id from table4")
.verifyResult("10");
// Check the new snapshots are created.
validateDiffSnapshotsCreated(primary.getDatabase(primaryDbName).getLocationUri());
// Try deleting a directory and add data to a already dumped and loaded table for using snapshot diff
tuple = primary.run("use " + primaryDbName)
.run("drop table table2")
.run("insert into table1 values (3),(4) ")
.dump(primaryDbName);
// Check if the dropped table isn't there and the new data is available.
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select * from table1 order by id")
.verifyResults(new String[]{"1","2","3","4"})
.run("show tables like 'table2'")
.verifyFailure(new String[] {"table2" });
}
@Test
public void testBasicStartFromIncrementalReplication() throws Throwable {
// Run a cycle of dump & load with snapshot disabled.
ArrayList<String> withClause = new ArrayList<>(1);
withClause.add("'"+ REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY + "' = 'false'");
WarehouseInstance.Tuple tuple = primary.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause);
tuple = primary.run("use " + primaryDbName)
.run("create external table t1 (place string) partitioned by (country string) row format "
+ "delimited fields terminated by ',' ")
.run("insert into t1 partition(country='india') values ('delhi')")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
.run("show partitions t1")
.verifyResults(new String[] { "country=india"})
.run("select place from t1 order by place")
.verifyResults(new String[] {"delhi"})
.verifyReplTargetProperty(replicatedDbName);
// Check snapshots aren't enabled, since the replication was run without snapshots enabled.
Path locationPath = new Path(primary.getDatabase(primaryDbName).getLocationUri());
DistributedFileSystem dfs = (DistributedFileSystem) locationPath.getFileSystem(conf);
assertFalse(dfs.getFileStatus(locationPath).isSnapshotEnabled());
// Enable snapshot based copy
primary.getConf().setBoolVar(REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY, true);
replica.getConf().setBoolVar(REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY, true);
// Add some data & then try a dump & load cycle.
primary.run("use " + primaryDbName)
.run("insert into t1 partition(country='india') values ('mumbai')")
.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select place from t1 order by place")
.verifyResults(new String[] {"delhi", "mumbai"})
.verifyReplTargetProperty(replicatedDbName);
// Check if the initial snapshots are created.
validateInitialSnapshotsCreated(primary.getDatabase(primaryDbName).getLocationUri());
// Run one more cycle of incremental dump & load to see diff snapshots are created, and add a new table so as to
// see if it gets included.
tuple = primary.run("use " + primaryDbName)
.run("insert into t1 partition(country='india') values ('pune')")
.run("create external table t2 (place string) partitioned by (country string) row format "
+ "delimited fields terminated by ',' ")
.run("insert into t2 partition(country='usa') values ('new york')")
.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 't2'")
.verifyResult("t2")
.run("show partitions t2")
.verifyResults(new String[] {"country=usa"})
.run("select place from t2 order by place")
.verifyResults(new String[] {"new york"})
.run("select place from t1 order by place")
.verifyResults(new String[] {"delhi", "mumbai", "pune"})
.verifyReplTargetProperty(replicatedDbName);
// Check if diff snapshots are created.
validateDiffSnapshotsCreated(primary.getDatabase(primaryDbName).getLocationUri());
}
@Test
public void testBasicExternalTableWithPartitions() throws Throwable {
Path externalTableLocation =
new Path("/" + testName.getMethodName() + "/t2/");
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.mkdirs(externalTableLocation, new FsPermission("777"));
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create external table t2 (place string) partitioned by (country string) row format "
+ "delimited fields terminated by ',' location '" + externalTableLocation.toString()
+ "'")
.run("insert into t2 partition(country='india') values ('bangalore')")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables like 't2'")
.verifyResults(new String[] { "t2" })
.run("select place from t2")
.verifyResults(new String[] { "bangalore" })
.verifyReplTargetProperty(replicatedDbName);
// add new data externally, to a partition, but under the table level top directory
Path partitionDir = new Path(externalTableLocation, "country=india");
try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file.txt"))) {
outputStream.write("pune\n".getBytes());
outputStream.write("mumbai\n".getBytes());
}
tuple = primary.run("use " + primaryDbName)
.run("insert into t2 partition(country='australia') values ('sydney')")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("select distinct(country) from t2")
.verifyResults(new String[] { "india", "australia" })
.run("select place from t2 where country='india'")
.verifyResults(new String[] { "bangalore", "pune", "mumbai" })
.run("select place from t2 where country='australia'")
.verifyResults(new String[] { "sydney" })
.run("show partitions t2")
.verifyResults(new String[] {"country=australia", "country=india"})
.verifyReplTargetProperty(replicatedDbName);
String tmpLocation2 = "/tmp1/" + System.nanoTime() + "_2";
primary.miniDFSCluster.getFileSystem().mkdirs(new Path(tmpLocation2), new FsPermission("777"));
// Try alter table location and then dump.
primary.run("use " + primaryDbName)
.run("insert into table t2 partition(country='france') values ('lyon')")
.run("alter table t2 set location '" + tmpLocation2 + "'")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("show partitions t2").verifyResults(
new String[] {"country=australia", "country=india", "country=france"});
// Test alter partition location.
String tmpLocation3 = "/tmp1/" + System.nanoTime() + "_3";
primary.miniDFSCluster.getFileSystem()
.mkdirs(new Path(tmpLocation2), new FsPermission("777"));
primary.run("use " + primaryDbName).run(
"alter table t2 partition (country='australia') set location '"
+ tmpLocation3 + "'").run(
"insert into table t2 partition(country='australia') values "
+ "('sydney')").dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("select place from t2 where country='australia'")
.verifyResult("sydney");
}
@Test
public void testSnapshotCleanupsOnDatabaseLocationChange() throws Throwable {
Path externalDatabaseLocation = new Path("/" + testName.getMethodName() + "/externalDatabase/");
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.mkdirs(externalDatabaseLocation, new FsPermission("777"));
Path externalDatabaseLocationAlter = new Path("/" + testName.getMethodName() + "/externalDatabaseAlter/");
fs.mkdirs(externalDatabaseLocationAlter, new FsPermission("777"));
Path externalDatabaseLocationDest =
new Path(REPLICA_EXTERNAL_BASE, testName.getMethodName() + "/externalDatabase/");
Path externalDatabaseLocationAlterDest =
new Path(REPLICA_EXTERNAL_BASE, testName.getMethodName() + "/externalDatabaseAlter/");
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
// Create a normal and partitioned table in the database location.
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("alter database " + primaryDbName + " set location '" + externalDatabaseLocation + "'")
.run("create external table emp1 (id int)")
.run("insert into emp1 values(1),(2)")
.run("create external table t2 (place string) partitioned by (country "
+ "string) row format delimited fields terminated by ','")
.run("insert into t2 partition(country='india') values ('bangalore')")
.dump(primaryDbName, withClause);
// Do a load, post that one snapshot should have been created in the source for each table.
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables like 'emp1'")
.verifyResults(new String[] {"emp1"})
.run("select id from emp1")
.verifyResults(new String[] {"1", "2"})
.run("select place from t2")
.verifyResults(new String[] {"bangalore"})
.verifyReplTargetProperty(replicatedDbName);
// Check the directory is snapshotable.
assertTrue(fs.getFileStatus(externalDatabaseLocation).isSnapshotEnabled());
// Check if the snapshot exists at source.
assertNotNull(fs.getFileStatus(
new Path(externalDatabaseLocation, ".snapshot/" + secondSnapshot(primaryDbName.toLowerCase()))));
// Check if the destination is snapshottable.
assertTrue(fs.getFileStatus(externalDatabaseLocationDest).isSnapshotEnabled());
// Check if snapshot exist at destination.
assertNotNull(fs.getFileStatus(
new Path(externalDatabaseLocationDest, ".snapshot/" + firstSnapshot(primaryDbName.toLowerCase()))));
// Alter database location and create another table inside it.
tuple = primary.run("use " + primaryDbName).run(
"alter database " + primaryDbName + " set location '"
+ externalDatabaseLocationAlter + "'")
.run("create external table empnew (id int)")
.run("insert into empnew values (3),(4)").run("drop table emp")
.run("insert into t2 partition(country='france') " + "values('paris')")
.dump(primaryDbName, withClause);
// Do a load and see if everything is correct.
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables like 'emp'")
.verifyFailure(new String[] {"emp"})
.run("show tables like 'empnew'")
.verifyResults(new String[] {"empnew"})
.run("select id from empnew")
.verifyResults(new String[] {"3", "4"})
.run("select place from t2 where country='france'")
.verifyResults(new String[] {"paris"})
.run("select place from t2 where country='india'")
.verifyResults(new String[] {"bangalore"})
.verifyReplTargetProperty(replicatedDbName);
// Check if the new database location is snapshottable.
assertTrue(fs.getFileStatus(externalDatabaseLocationAlter).isSnapshotEnabled());
// Check if the snapshot exists at source.
assertNotNull(fs.getFileStatus(
new Path(externalDatabaseLocationAlter, ".snapshot/" + secondSnapshot(primaryDbName.toLowerCase()))));
// Check if the old snapshot got deleted at source.
LambdaTestUtils.intercept(FileNotFoundException.class, () -> fs
.getFileStatus(new Path(externalDatabaseLocation, ".snapshot/" + secondSnapshot(primaryDbName.toLowerCase()))));
// Check if the new destination database location is snapshottable.
assertTrue(fs.getFileStatus(externalDatabaseLocationAlterDest).isSnapshotEnabled());
// Check if snapshot exist at destination.
assertNotNull(fs.getFileStatus(
new Path(externalDatabaseLocationAlterDest, ".snapshot/" + firstSnapshot(primaryDbName.toLowerCase()))));
//Check if the destination old snapshot is deleted.
LambdaTestUtils.intercept(FileNotFoundException.class, () -> fs.getFileStatus(
new Path(externalDatabaseLocationDest, ".snapshot/" + firstSnapshot(primaryDbName.toLowerCase()))));
}
@Test
public void testFailureScenarios() throws Throwable {
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
// Create a table which couldn't create snapshot during dump.
Path externalTableLocationSource = new Path("/" + testName.getMethodName() + "source1/tablesource/");
fs.mkdirs(externalTableLocationSource, new FsPermission("777"));
// Allow snapshot on the parent of table directory, the creation of snapshot shall fail for the table directory
// during dump.
fs.allowSnapshot(externalTableLocationSource.getParent());
// Create a table which can not create snapshot on the destination.
Path externalTableLocationDest = new Path("/" + testName.getMethodName() + "dest1/tabledest/");
fs.mkdirs(externalTableLocationDest, new FsPermission("777"));
Path externalTableLocationDestInDest = new Path(REPLICA_EXTERNAL_BASE, testName.getMethodName() + "dest1/tabledest/");
withClause.add("'hive.repl.external.warehouse.single.copy.task.paths'='" + externalTableLocationSource
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + "'");
try {
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create external table tablesource (place string) row format delimited fields terminated by ',' location '"
+ externalTableLocationSource.toString() + "'")
.run("insert into tablesource values ('bangalore')")
.dump(primaryDbName, withClause);
fail("Should have thrown snapshot exception");
} catch (SnapshotException se) {
// Ignore
}
// Check if there is a non-recoverable error or not.
Path baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR));
Path nonRecoverablePath =
TestReplicationScenarios.getNonRecoverablePath(baseDumpDir, primaryDbName, primary.hiveConf);
assertTrue(fs.exists(nonRecoverablePath));
// Fix the table and dump & load which should be success.
fs.disallowSnapshot(externalTableLocationSource.getParent());
fs.delete(nonRecoverablePath, true);
primary.dump(primaryDbName, withClause);
// Load and check if the data is there, we should have fallback to normal mode
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("select place from tablesource")
.verifyResults(new String[] {"bangalore"})
.verifyReplTargetProperty(replicatedDbName);
// Create a table for which target is non-snapshottable.
// Allow snapshot on the parent of destination.
fs.mkdirs(externalTableLocationDestInDest.getParent());
fs.allowSnapshot(externalTableLocationDestInDest.getParent());
withClause.add("'hive.repl.external.warehouse.single.copy.task.paths'='" + externalTableLocationSource
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + "," + externalTableLocationDest
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + "'");
WarehouseInstance.Tuple tuple;
try {
tuple = primary.run("use " + primaryDbName).run(
"create external table tabledest (place string) partitioned by (country string) row format "
+ "delimited fields terminated by ',' location '" + externalTableLocationDest.toString() + "'")
.run("insert into tabledest partition(country='india') values ('kanpur')")
.run("insert into tabledest partition(country='india') values ('lucknow')")
.run("insert into tabledest partition(country='usa') values ('New York')")
.run("insert into tablesource values('chennai')")
.dump(primaryDbName, withClause);
fail("Expected a snapshot exception.");
} catch (SecurityException se) {
// This is expected!!!
}
nonRecoverablePath =
TestReplicationScenarios.getNonRecoverablePath(baseDumpDir, primaryDbName, primary.hiveConf);
assertTrue(fs.exists(nonRecoverablePath));
fs.delete(nonRecoverablePath, true);
fs.disallowSnapshot(externalTableLocationDestInDest.getParent());
primary.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("select place from tabledest where country='usa'")
.verifyResults(new String[] {"New York"})
.run("select place from tabledest where country='india'")
.verifyResults(new String[] { "kanpur", "lucknow" })
.run("select place from tablesource")
.verifyResults(new String[] {"bangalore", "chennai"});
// Add a new table which will fail to create snapshots, post snapshots for other tables have been created.
Path externalTableLocationSource2 = new Path("/" + testName.getMethodName() + "source2/tablesource/");
fs.mkdirs(externalTableLocationSource2, new FsPermission("777"));
fs.allowSnapshot(externalTableLocationSource2.getParent());
withClause.add("'hive.repl.external.warehouse.single.copy.task.paths'='" + externalTableLocationSource
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + "," + externalTableLocationDestInDest
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + "," + externalTableLocationSource2
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + "'");
try {
tuple = primary.run("use " + primaryDbName)
.run("create external table tablesource2 (place string) row format delimited fields terminated by ',' "
+ "location '" + externalTableLocationSource2.toString() + "'")
.run("insert into tablesource2 values ('delhi')")
.run("insert into tablesource2 values ('noida')")
.run("insert into tabledest partition(country='usa') values ('San Jose')")
.run("insert into tablesource values('kolkata')")
.dump(primaryDbName, withClause);
fail("Expected a snapshot exception.");
} catch (SnapshotException se) {
// Expected, Ignore
}
nonRecoverablePath = TestReplicationScenarios.getNonRecoverablePath(baseDumpDir, primaryDbName, primary.hiveConf);
assertTrue(fs.exists(nonRecoverablePath));
fs.delete(nonRecoverablePath, true);
fs.disallowSnapshot(externalTableLocationSource2.getParent());
primary
.run("insert into tablesource values('patna')")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("select place from tabledest where country='usa'")
.verifyResults(new String[] {"New York", "San Jose"})
.run("select place from tabledest where country='india'")
.verifyResults(new String[] { "kanpur", "lucknow" })
.run("select place from tablesource")
.verifyResults(new String[] {"bangalore", "chennai", "kolkata", "patna"})
.run("select place from tablesource2")
.verifyResults(new String[] {"delhi", "noida"});
}
@Test
public void testCustomPathTableSnapshots() throws Throwable {
Path externalTableLocation1 = new Path("/" + testName.getMethodName() + "/t1/");
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.mkdirs(externalTableLocation1, new FsPermission("777"));
Path externalTableLocation2 = new Path("/" + testName.getMethodName() + "/t2/");
fs.mkdirs(externalTableLocation1, new FsPermission("777"));
Path externalTablePart1 = new Path("/" + testName.getMethodName() + "/part1/");
fs.mkdirs(externalTableLocation1, new FsPermission("777"));
Path externalTablePart2 = new Path("/" + testName.getMethodName() + "/part2/");
fs.mkdirs(externalTableLocation1, new FsPermission("777"));
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'hive.repl.external.warehouse.single.copy.task.paths'='" + externalTableLocation2
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + "," + externalTablePart1
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + "'");
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create external table t1 (place string) partitioned by (country"
+ " string) row format "
+ "delimited fields terminated by ',' location '" + externalTableLocation1.toString()
+ "'")
.run("insert into t1 partition(country='india') values ('bangalore')")
.run("create external table t2 (place string) partitioned by (country"
+ " string) row format "
+ "delimited fields terminated by ',' location '" + externalTableLocation2.toString()
+ "'")
.run("insert into t1 partition(country='australia') values "
+ "('sydney')")
.run("insert into t2 partition(country='nepal') values "
+ "('kathmandu')")
.run("alter table t1 add partition (country='france') location '"+externalTablePart1.toString() + "'")
.run("insert into t1 partition(country='france') values ('paris')")
.run("alter table t2 add partition (country='china') location '"+externalTablePart2.toString() + "'")
.run("insert into t2 partition(country='china') values ('beejing')")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResults(new String[] {"t1"})
.run("show tables like 't2'")
.verifyResults(new String[] {"t2"})
.run("select place from t1 where country='india'")
.verifyResults(new String[] {"bangalore"})
.run("select place from t1 where country='france'")
.verifyResults(new String[] {"paris"})
.run("select place from t2 where country='nepal'")
.verifyResults(new String[] {"kathmandu"})
.run("select place from t2 where country='china'")
.verifyResults(new String[] {"beejing"})
.verifyReplTargetProperty(replicatedDbName);
// Check if the t2 directory is snapshotoble and the snapshot is there.
validateInitialSnapshotsCreated(externalTableLocation2.toString());
// Check if the partition directory is snapshottable and has snapshots
validateInitialSnapshotsCreated(externalTablePart1.toString());
// Check the table not specified as configuration is not snapshottable
assertFalse(fs.getFileStatus(externalTableLocation1).isSnapshotEnabled());
// Check the partition not specified as configuration is not snapshottable
assertFalse(fs.getFileStatus(externalTablePart2).isSnapshotEnabled());
// Drop the snapshotted table and create another table with same location and add the other table to the config.
withClause.add("'hive.repl.external.warehouse.single.copy.task.paths'='" + externalTableLocation2
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + "," + externalTableLocation1
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString()+ "," + externalTablePart1
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + "'");
primary.run("use " + primaryDbName)
.run("drop table t2")
.run("create external table t4 (place string) partitioned by (country string) row format delimited fields"
+ " terminated by ',' location '" + externalTableLocation2.toString() + "'")
.run("MSCK REPAIR TABLE t4 ADD PARTITIONS")
.run("insert into t4 partition(country='china') values ('Shanghai')") // new partition in the recreated table.
.run("insert into t4 partition(country='nepal') values ('pokhra')") // entry in old existing partition in recreated table.
.run("insert into t1 partition(country='india') values ('chennai')") // entry for newely added path in list.
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables like 't2'")
.verifyFailure(new String[] {"t1"})
.run("select place from t1 where country='india'")
.verifyResults(new String[] {"bangalore", "chennai"})
.run("select place from t1 where country='france'")
.verifyResults(new String[] {"paris"})
.run("select place from t4 where country='nepal'")
.verifyResults(new String[] {"kathmandu", "pokhra"}) // Old and new entry both should be there
.run("select place from t4 where country='china'")
.verifyResults(new String[] {"Shanghai"}) // the newely added partition should be there.
.verifyReplTargetProperty(replicatedDbName);
// The newly added path should have the initial snapshot
validateInitialSnapshotsCreated(externalTableLocation1.toString());
// The old path should have diff snapshot though the table got deleted, since the new table uses the same path
validateDiffSnapshotsCreated(externalTableLocation2.toString());
}
@Test
public void testCustomPathTableSnapshotsCleanup() throws Throwable {
Path externalTableLocation1 = new Path("/" + testName.getMethodName() + "/t1/");
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.mkdirs(externalTableLocation1, new FsPermission("777"));
Path externalTableLocation1Target = new Path("/replica_external_base/" + testName.getMethodName() + "/t1/");
Path externalTableLocation2 = new Path("/" + testName.getMethodName() + "/t2/");
fs.mkdirs(externalTableLocation1, new FsPermission("777"));
Path externalTableLocation2Target = new Path("/replica_external_base/" + testName.getMethodName() + "/t2/");
Path externalTableLocation2New = new Path("/" + testName.getMethodName() + "/t2alter/");
fs.mkdirs(externalTableLocation2New, new FsPermission("777"));
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'hive.repl.external.warehouse.single.copy.task.paths'='" + externalTableLocation2
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + "," + externalTableLocation1
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + "'");
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create external table t1 (place string) partitioned by (country string) row format "
+ "delimited fields terminated by ',' location '" + externalTableLocation1.toString() +"'")
.run("insert into t1 partition(country='india') values ('bangalore')")
.run("create external table t2 (place string) partitioned by (country string) row format "
+ "delimited fields terminated by ',' location '" + externalTableLocation2.toString() + "'")
.run("insert into t2 partition(country='nepal') values ('kathmandu')")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResults(new String[] {"t1"})
.run("show tables like 't2'")
.verifyResults(new String[] {"t2"})
.run("select place from t1 where country='india'")
.verifyResults(new String[] {"bangalore"})
.run("select place from t2 where country='nepal'")
.verifyResults(new String[] {"kathmandu"})
.verifyReplTargetProperty(replicatedDbName);
// Check if the t1 directory is snapshotoble and the snapshot is there.
assertTrue(fs.getFileStatus(externalTableLocation1).isSnapshotEnabled());
assertNotNull(
fs.getFileStatus(new Path(externalTableLocation1, ".snapshot/" + secondSnapshot(primaryDbName.toLowerCase()))));
assertTrue(fs.getFileStatus(externalTableLocation1Target).isSnapshotEnabled());
// Check if the t2 directory is snapshotoble and the snapshot is there.
assertTrue(fs.getFileStatus(externalTableLocation2).isSnapshotEnabled());
assertNotNull(
fs.getFileStatus(new Path(externalTableLocation2, ".snapshot/" + secondSnapshot(primaryDbName.toLowerCase()))));
assertTrue(fs.getFileStatus(externalTableLocation2Target).isSnapshotEnabled());
// Drop table t1 and alter the location of table t2
tuple = primary.run("use " + primaryDbName)
.run("drop table t1")
.run("alter table t2 SET LOCATION '" + externalTableLocation2New +"'")
.run("alter table t2 drop partition(country='nepal')")
.run("insert into t2 partition(country='china') values ('beejing')")
.dump(primaryDbName, withClause);
// Load and check if the data is there and the dropped table isn't there.
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyFailure(new String[] {"t1"})
.run("show tables like 't2'")
.verifyResults(new String[] {"t2"})
.run("select place from t2 where country='china'")
.verifyResults(new String[] {"beejing"})
.verifyReplTargetProperty(replicatedDbName);
// Verify the new location is not snapshottable
assertFalse(fs.getFileStatus(externalTableLocation2New).isSnapshotEnabled());
// Verify the old location is not snapshottable now and the snapshot is cleaned up.
assertFalse(fs.getFileStatus(externalTableLocation1).isSnapshotEnabled());
assertFalse(fs.getFileStatus(externalTableLocation1Target).isSnapshotEnabled());
assertFalse(fs.getFileStatus(externalTableLocation2).isSnapshotEnabled());
assertFalse(fs.getFileStatus(externalTableLocation2Target).isSnapshotEnabled());
}
@Test
public void testTargetModified() throws Throwable {
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create external table table1 (id int)")
.run("insert into table table1 values (1)")
.run("insert into table table1 values (2)")
.dump(primaryDbName);
// Call load, For the first time, only snapshots would be created and distCp would run from source snapshot to
// target.
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 'table1'")
.verifyResult("table1")
.run("select * from table1").
verifyResults(new String[]{"1","2"});
// Verify Snapshots are created in source.
validateInitialSnapshotsCreated(primary.getDatabase(primaryDbName).getLocationUri());
// Create a new table and do dump
tuple = primary.run("use " + primaryDbName)
.run("create external table table3 (id int)")
.run("insert into table table3 values (10)")
.run("insert into table table1 values (3)")
.dump(primaryDbName);
// Verify that the table info is written correctly for incremental
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 'table3'")
.verifyResult("table3")
.run("select id from table3")
.verifyResult("10")
.run("select id from table1")
.verifyResults(new String[] {"1", "2", "3"});
// Check the new snapshots are created.
validateDiffSnapshotsCreated(primary.getDatabase(primaryDbName).getLocationUri());
// Now modify the target by deleting one table directory
Path targetWhPath = externalTableDataPath(conf, REPLICA_EXTERNAL_BASE,
new Path(primary.getDatabase(primaryDbName).getLocationUri()));
DistributedFileSystem replicaDfs = (DistributedFileSystem) targetWhPath.getFileSystem(conf);
assertTrue(replicaDfs.delete(new Path(targetWhPath, "table1"), true));
// Add some data to the table and do a Dump & Load
tuple = primary.run("use " + primaryDbName)
.run("insert into table table3 values (11)")
.run("insert into table table1 values (4)")
.dump(primaryDbName);
// Load and see the data is there.
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select id from table3")
.verifyResults(new String[]{"10","11"})
.run("select id from table1")
.verifyResults(new String[] {"1", "2", "3", "4"});
// Modify the target by adding and modifying data
replicaDfs.create(new Path(targetWhPath, "table1/000000_0_new")).close();
replicaDfs.create(new Path(targetWhPath, "table1/000000_0"), true).close();
// Add some more data to table1
tuple = primary.run("use " + primaryDbName)
.run("insert into table table1 values (5)")
.dump(primaryDbName);
// Load and see the new data is there.
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select id from table1")
.verifyResults(new String[] {"1", "2", "3", "4", "5"});
}
@Test
public void testSnapshotMetrics() throws Throwable {
conf.set(Constants.SCHEDULED_QUERY_SCHEDULENAME, "metrics_test");
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
MetricCollector collector = MetricCollector.getInstance();
Path externalDatabaseLocation = new Path("/" + testName.getMethodName() + "/externalDatabase/");
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.mkdirs(externalDatabaseLocation, new FsPermission("777"));
Path externalTableLocation1 = new Path("/" + testName.getMethodName() + "/t1/");
fs = primary.miniDFSCluster.getFileSystem();
fs.mkdirs(externalTableLocation1, new FsPermission("777"));
withClause.add("'hive.repl.external.warehouse.single.copy.task.paths'='" + externalTableLocation1
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + "'");
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create external table emp1 (id int)")
.run("insert into emp1 values(1),(2)")
.run("create external table exttab (place string) row format delimited fields terminated by ','"
+ " location '" + externalTableLocation1.toString() +"'")
.run("insert into exttab values('lucknow')")
.dump(primaryDbName, withClause);
// The boootstrap stage, 2 directories for which snapshot is enabled, the database directory and the one table
// as part of the config. This would be initial copy stage, so only 1 snapshot per directory and none to be deleted.
assertIncrementalMetricsValues(BOOTSTRAP, collector, 2 ,0);
Iterator<ReplicationMetric> itr = collector.getMetrics().iterator();
while (itr.hasNext()) {
ReplicationMetric elem = itr.next();
assertEquals(BOOTSTRAP, elem.getMetadata().getReplicationType());
List<Stage> stages = elem.getProgress().getStages();
for (Stage stage : stages) {
SnapshotUtils.ReplSnapshotCount counts = stage.getReplSnapshotCount();
assertEquals(2, counts.getNumCreated());
assertEquals(0, counts.getNumDeleted());
}
}
// Load and check if the data and table are there.
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables like 'emp1'")
.verifyResults(new String[] {"emp1"})
.run("select id from emp1")
.verifyResults(new String[] {"1", "2"})
.run("show tables like 'exttab'")
.verifyResults(new String[]{"exttab"})
.run("select place from exttab")
.verifyResults(new String[] {"lucknow"})
.verifyReplTargetProperty(replicatedDbName);
// Add some data and try incremental dump.
tuple = primary.run("use " + primaryDbName)
.run("insert into emp1 values(3),(4)")
.run("insert into exttab values('agra')")
.dump(primaryDbName, withClause);
// This is from the diff stage, 2 Directories where snapshots were enabled, 1 old snapshots got deleted and 1
// got created, so 2 created and 2 deleted.
assertIncrementalMetricsValues(INCREMENTAL, collector, 2 ,2);
// Do a load
replica.load(replicatedDbName, primaryDbName, withClause);
// Remove the with clause, hence the external table specified as part of the config.
tuple = primary.run("use " + primaryDbName)
.run("insert into exttab values('lucknow')")
.dump(primaryDbName, null);
// Only one directory, i.e the database directory is going through snapshot based replication, so only 1 created
// for it and 1 old deleted for it, 2 deleted for the table removed from the snapshot based replication scope.
assertIncrementalMetricsValues(INCREMENTAL, collector, 1 ,3);
}
private void assertIncrementalMetricsValues(Metadata.ReplicationType replicationType, MetricCollector collector, int numCreated,
int numDeleted) {
Iterator<ReplicationMetric> itr;
itr = collector.getMetrics().iterator();
while (itr.hasNext()) {
ReplicationMetric elem = itr.next();
assertEquals(replicationType, elem.getMetadata().getReplicationType());
List<Stage> stages = elem.getProgress().getStages();
for (Stage stage : stages) {
SnapshotUtils.ReplSnapshotCount count = stage.getReplSnapshotCount();
assertEquals(numCreated, count.getNumCreated());
assertEquals(numDeleted, count.getNumDeleted());
}
}
}
@Test
public void testPurgeAndReBootstrap() throws Throwable {
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
Path externalTableLocation1 = new Path("/" + testName.getMethodName() + "/table1/");
fs.mkdirs(externalTableLocation1, new FsPermission("777"));
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'hive.repl.external.warehouse.single.copy.task.paths'='" + externalTableLocation1
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + "'");
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create external table table1 (place string) partitioned by (country string) row format "
+ "delimited fields terminated by ',' location '" + externalTableLocation1.toString() + "'")
.run("ALTER TABLE table1 SET TBLPROPERTIES ('external.table.purge'='true')")
.run("create external table table2 (id int)")
.run("create external table table3 (id int)")
.run("ALTER TABLE table3 SET TBLPROPERTIES ('external.table.purge'='true')")
.run("insert into table1 partition(country='nepal') values ('kathmandu')")
.run("insert into table1 partition(country='china') values ('beejing')")
.run("insert into table2 values(1)")
.run("insert into table3 values(5)")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables like 'table1'")
.verifyResults(new String[] {"table1"})
.run("select place from table1 where country='nepal'")
.verifyResults(new String[] {"kathmandu"})
.run("select place from table1 where country='china'")
.verifyResults(new String[] {"beejing"})
.verifyReplTargetProperty(replicatedDbName);
// Check if the t2 directory is snapshotoble and the snapshot is there.
validateInitialSnapshotsCreated(externalTableLocation1.toString());
// Add some more data and do a dump & load
primary.run("use " + primaryDbName)
.run("insert into table1 partition(country='china') values ('wuhan')")
.run("insert into table2 values(2)")
.run("insert into table3 values(6)")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("select place from table1 where country='china'")
.verifyResults(new String[] {"beejing", "wuhan"})
.verifyReplTargetProperty(replicatedDbName);
// Verify if diff snapshots is there.
validateDiffSnapshotsCreated(externalTableLocation1.toString());
assertTrue(fs.listStatus(externalTableLocation1).length > 0);
// Now purge the table
primary.run("use " + primaryDbName)
.run("drop table table1 purge")
.run("insert into table2 values(3)")
.run("insert into table3 values(7)")
.dump(primaryDbName, withClause);
// Check if target dir exists.
assertFalse(fs.exists(externalTableLocation1));
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables like 'table1'")
.verifyFailure(new String[] {"table1"})
.run("select id from table2")
.verifyResults(new String[] {"1", "2", "3"})
.run("select id from table3")
.verifyResults(new String[] {"5", "6", "7"})
.verifyReplTargetProperty(replicatedDbName);
Path externalDbLocation = new Path(replica.getDatabase(replicatedDbName).getLocationUri());
assertTrue(fs.exists(externalDbLocation));
replica.run("use " + replicatedDbName)
.run("drop table table2")
.run("drop table table3")
.run("drop database "+ replicatedDbName + " cascade");
assertFalse(fs.exists(externalDbLocation));
// Delete to force a bootstrap
Path parent = new Path(tuple.dumpLocation).getParent().getParent();
fs.delete(parent, true);
// Delete the external data base dir.
Path dbLocation = new Path(REPLICA_EXTERNAL_BASE,
Path.getPathWithoutSchemeAndAuthority(new Path(primary.getDatabase(primaryDbName).getLocationUri())).toString()
.replaceFirst("/", ""));
deleteReplRelatedSnapshots(fs, dbLocation, conf);
fs.delete(REPLICA_EXTERNAL_BASE, true);
primary.run("use " + primaryDbName)
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables like 'table1'")
.verifyFailure(new String[] {"table1"})
.run("select id from table2")
.verifyResults(new String[] {"1", "2", "3"})
.run("select id from table3")
.verifyResults(new String[] {"5", "6", "7"})
.verifyReplTargetProperty(replicatedDbName);
validateInitialSnapshotsCreated(primary.getDatabase(primaryDbName).getLocationUri());
}
@Test
public void testSnapshotsWithCustomDbLevelPaths() throws Throwable {
// Create table1 inside database warehouse location, table2 outside db location and non snapshottable, table3
// outside database location but snapshottable, table4 and table5 inside a custom db path and snapshottable,
// table6 and table7 inside a custom db path but not snapshottable.
Path externalTableLocation2 = new Path("/" + testName.getMethodName() + "/table2/");
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.mkdirs(externalTableLocation2, new FsPermission("777"));
Path externalTableLocation3 = new Path("/" + testName.getMethodName() + "/table3/");
fs.mkdirs(externalTableLocation3, new FsPermission("777"));
Path externalCustomDb1Path = new Path("/" + testName.getMethodName() + "/customDb1/");
Path externalCustomDb2Path = new Path("/" + testName.getMethodName() + "/customDb2/");
Path externalTableLocation4 = new Path(externalCustomDb1Path, "table4");
fs.mkdirs(externalTableLocation4, new FsPermission("777"));
Path externalTableLocation5 = new Path(externalCustomDb1Path, "table5");
fs.mkdirs(externalTableLocation5, new FsPermission("777"));
Path externalTableLocation6 = new Path(externalCustomDb2Path, "table6");
fs.mkdirs(externalTableLocation6, new FsPermission("777"));
Path externalTableLocation7 = new Path(externalCustomDb2Path, "table7");
fs.mkdirs(externalTableLocation7, new FsPermission("777"));
// Specify table3 and the customDb1 location as snapshottable and customDb1 & customDb2 as custom locations for
// single copy task.
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'"
+ REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK_PATHS.varname + "'='" + externalCustomDb1Path
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + "," + externalTableLocation3
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + "'");
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create external table table1 (id int)")
.run("create external table table2 (place string) partitioned by (country string) row format "
+ "delimited fields terminated by ',' location '" + externalTableLocation2.toString() +"'")
.run("create external table table3 (place string) partitioned by (country string) row format "
+ "delimited fields terminated by ',' location '" + externalTableLocation3.toString() +"'")
.run("create external table table4 (place string) partitioned by (country string) row format "
+ "delimited fields terminated by ',' location '" + externalTableLocation4.toString() +"'")
.run("create external table table5 (place string) partitioned by (country string) row format "
+ "delimited fields terminated by ',' location '" + externalTableLocation5.toString() +"'")
.run("create external table table6 (place string) partitioned by (country string) row format "
+ "delimited fields terminated by ',' location '" + externalTableLocation6.toString() +"'")
.run("create external table table7 (place string) partitioned by (country string) row format "
+ "delimited fields terminated by ',' location '" + externalTableLocation7.toString() +"'")
.run("insert into table2 partition(country='india') values ('bangalore')")
.run("insert into table3 partition(country='china') values ('beejing')")
.run("insert into table4 partition(country='usa') values ('new york')")
.run("insert into table5 partition(country='japan') values ('tokyo')")
.run("insert into table6 partition(country='nepal') values ('kathmandu')")
.run("insert into table7 partition(country='australia') values ('sydney')")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("select place from table2 where country='india'")
.verifyResults(new String[] {"bangalore"})
.run("select place from table3 where country='china'")
.verifyResults(new String[] {"beejing"})
.run("select place from table4 where country='usa'")
.verifyResults(new String[] {"new york"})
.run("select place from table5 where country='japan'")
.verifyResults(new String[] {"tokyo"})
.run("select place from table6 where country='nepal'")
.verifyResults(new String[] {"kathmandu"})
.run("select place from table7 where country='australia'")
.verifyResults(new String[] {"sydney"})
.verifyReplTargetProperty(replicatedDbName);
// Check if the table3 and custom db1 location directory is snapshotoble and the snapshot is there.
validateInitialSnapshotsCreated(externalTableLocation3.toString());
validateInitialSnapshotsCreated(externalCustomDb1Path.toString());
// Check if others aren't snapshottable.
assertFalse(fs.getFileStatus(externalCustomDb2Path).isSnapshotEnabled());
assertFalse(fs.getFileStatus(externalTableLocation2).isSnapshotEnabled());
// Add some more data and do a dump & load so as to create diff snapshots.
tuple = primary.run("use " + primaryDbName)
.run("insert into table2 partition(country='india') values ('chennai')")
.run("insert into table3 partition(country='china') values ('chengdu')")
.run("insert into table4 partition(country='usa') values ('washington')")
.run("select place from table4 where country='usa'")
.verifyResults(new String[] {"new york", "washington"})
.run("insert into table5 partition(country='japan') values ('osaka')")
.run("insert into table6 partition(country='nepal') values ('pokhra')")
.run("insert into table7 partition(country='australia') values ('perth')")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables like 'table%'")
.verifyResults(new String[] {"table1", "table2", "table3", "table4", "table5", "table6", "table7"})
.run("select place from table2 where country='india'")
.verifyResults(new String[] {"bangalore", "chennai"})
.run("select place from table3 where country='china'")
.verifyResults(new String[] {"beejing", "chengdu"})
.run("select place from table4 where country='usa'")
.verifyResults(new String[] {"new york", "washington"})
.run("select place from table5 where country='japan'")
.verifyResults(new String[] {"tokyo", "osaka"})
.run("select place from table6 where country='nepal'")
.verifyResults(new String[] {"kathmandu", "pokhra"})
.run("select place from table7 where country='australia'")
.verifyResults(new String[] {"sydney", "perth"})
.verifyReplTargetProperty(replicatedDbName);
// Check if the diff snapshots got created for the paths.
validateDiffSnapshotsCreated(externalTableLocation3.toString());
validateDiffSnapshotsCreated(externalCustomDb1Path.toString());
// Check if others doesn't become snapshottable during incremental
assertFalse(fs.getFileStatus(externalCustomDb2Path).isSnapshotEnabled());
assertFalse(fs.getFileStatus(externalTableLocation2).isSnapshotEnabled());
// Add some data to customDb location and see if the diff copy is done and do a dump & load cycle.
tuple = primary.run("use " + primaryDbName)
.run("insert into table4 partition(country='usa') values ('chicago')")
.run("drop table table5 purge")
.run("insert into table6 partition(country='nepal') values ('lalitpur')")
.run("insert into table7 partition(country='australia') values ('adelaide')")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables like 'table%'")
.verifyResults(new String[] {"table1", "table2", "table3", "table4", "table6", "table7"})
.run("select place from table4 where country='usa'")
.verifyResults(new String[] {"new york", "washington", "chicago"})
.run("select place from table6 where country='nepal'")
.verifyResults(new String[] {"kathmandu", "pokhra", "lalitpur"})
.run("select place from table7 where country='australia'")
.verifyResults(new String[] {"sydney", "perth", "adelaide"})
.verifyReplTargetProperty(replicatedDbName);
// Check if the t1 directory is snapshotoble and the snapshot is there.
validateDiffSnapshotsCreated(externalTableLocation3.toString());
validateDiffSnapshotsCreated(externalCustomDb1Path.toString());
// Add the other custom db path for creating snapshots and remove the already configured path as snapshottable.
// The new db location should become snapshottable and the snapshots should be cleared for the other db location.
withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'" + REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK_PATHS.varname + "'='" + externalCustomDb2Path
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + "," + externalTableLocation3
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + "'");
primary.run("use " + primaryDbName)
.run("insert into table4 partition(country='usa') values ('austin')")
.run("insert into table6 partition(country='nepal') values ('janakpur')")
.run("insert into table7 partition(country='australia') values ('darwin')")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables like 'table%'")
.verifyResults(new String[] {"table1", "table2", "table3", "table4", "table6", "table7"})
.run("select place from table4 where country='usa'")
.verifyResults(new String[] {"new york", "washington", "chicago", "austin"})
.run("select place from table6 where country='nepal'")
.verifyResults(new String[] {"kathmandu", "pokhra", "lalitpur", "janakpur"})
.run("select place from table7 where country='australia'")
.verifyResults(new String[] {"sydney", "perth", "adelaide", "darwin"})
.verifyReplTargetProperty(replicatedDbName);
// Check if the table3 directory is snapshotoble and the db path2 became snapshottable.
validateDiffSnapshotsCreated(externalTableLocation3.toString());
validateInitialSnapshotsCreated(externalCustomDb2Path.toString());
// Check if others doesn't become snapshottable and the snapshots are cleared for the first custom db location.
assertFalse(fs.getFileStatus(externalCustomDb1Path).isSnapshotEnabled());
assertFalse(fs.getFileStatus(externalTableLocation2).isSnapshotEnabled());
}
// Verifies if the diff snapshots are created for source and target database.
private void validateDiffSnapshotsCreated(String location) throws Exception {
Path locationPath = new Path(location);
DistributedFileSystem dfs = (DistributedFileSystem) locationPath.getFileSystem(conf);
assertNotNull(dfs.getFileStatus(new Path(locationPath, ".snapshot/" + firstSnapshot(primaryDbName.toLowerCase()))));
assertNotNull(
dfs.getFileStatus(new Path(locationPath, ".snapshot/" + secondSnapshot(primaryDbName.toLowerCase()))));
}
@Test
public void testSnapshotsWithFiltersCustomDbLevelPaths() throws Throwable {
// Directory Structure:
// /prefix/project/ <- Specified as custom Location.(Snapshot Root)
// /randomStuff <- Not to be copied as part of external data copy
// /warehouse1 <- To be copied, Contains table1 & table2
// /warehouse2 <- To be copied, Contains table3 & table4
// Create /prefix/project
Path project = new Path("/" + testName.getMethodName() + "/project");
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.mkdirs(project);
// Create /prefix/project/warehouse1
Path warehouse1 = new Path(project, "warehouse1");
fs.mkdirs(warehouse1);
// Create /prefix/project/warehouse2
Path warehouse2 = new Path(project, "warehouse2");
fs.mkdirs(warehouse2);
// Table1 Path: /prefix/project/warehouse1/table1
Path table1 = new Path(warehouse1, "table1");
fs.mkdirs(table1);
// Table2 Path: /prefix/project/warehouse1/table2
Path table2 = new Path(warehouse1, "table2");
fs.mkdirs(table2);
// Table3 Path: /prefix/project/warehouse2/table3
Path table3 = new Path(warehouse2, "table3");
fs.mkdirs(table3);
// Table4 Path: /prefix/project/warehouse2/table4
Path table4 = new Path(warehouse2, "table4");
fs.mkdirs(table4);
// Random Dir inside the /prefix/project
Path random = new Path(project, "randomStuff");
fs.mkdirs(random);
fs.create(new Path(random, "file1")).close();
fs.create(new Path(random, "file2")).close();
fs.create(new Path(random, "file3")).close();
// Create a filter file for DistCp
String filterFilePath = "/tmp/filter";
FileWriter myWriter = new FileWriter(filterFilePath);
myWriter.write(".*randomStuff.*");
myWriter.close();
assertTrue(new File(filterFilePath).exists());
// Specify the project directory as the snapshot root using the single copy task path config.
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'"
+ REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK_PATHS.varname + "'='" + project
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + "'");
// Add Filter file
withClause.add("'distcp.options.filters'='" + filterFilePath + "'");
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create external table table1 (place string) row format "
+ "delimited fields terminated by ',' location '" + table1.toString() +"'")
.run("create external table table2 (place string) row format "
+ "delimited fields terminated by ',' location '" + table2.toString() +"'")
.run("create external table table3 (place string) row format "
+ "delimited fields terminated by ',' location '" + table3.toString() +"'")
.run("create external table table4 (place string) row format "
+ "delimited fields terminated by ',' location '" + table4.toString() +"'")
.run("insert into table1 values ('bangalore')")
.run("select place from table1")
.verifyResults(new String[] {"bangalore"})
.run("insert into table2 values ('beejing')")
.run("insert into table3 values ('new york')")
.run("insert into table4 values ('tokyo')")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("select place from table1")
.verifyResults(new String[] {"bangalore"})
.run("select place from table2")
.verifyResults(new String[] {"beejing"})
.run("select place from table3")
.verifyResults(new String[] {"new york"})
.run("select place from table4")
.verifyResults(new String[] {"tokyo"})
.verifyReplTargetProperty(replicatedDbName);
// Check if the initial snapshot got created for project dir
validateInitialSnapshotsCreated(project.toString());
// Check if the randomStuff Directory didn't get copied.
assertFalse(fs.exists(new Path(REPLICA_EXTERNAL_BASE, random.toUri().getPath().replaceFirst("/", ""))));
// Diff Mode Of Snapshot.
tuple = primary.run("use " + primaryDbName)
.run("insert into table1 values ('delhi')")
.run("insert into table2 values ('wuhan')")
.run("insert into table3 values ('washington')")
.run("insert into table4 values ('osaka')")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("select place from table1")
.verifyResults(new String[] {"bangalore", "delhi"})
.run("select place from table2")
.verifyResults(new String[] {"beejing", "wuhan"})
.run("select place from table3")
.verifyResults(new String[] {"new york", "washington"})
.run("select place from table4")
.verifyResults(new String[] {"tokyo", "osaka"})
.verifyReplTargetProperty(replicatedDbName);
// Check if diff snapshot got created.
validateDiffSnapshotsCreated(project.toString());
// Check if the randomStuff Directory didn't get copied, post diff copy
assertFalse(fs.exists(new Path(REPLICA_EXTERNAL_BASE, random.toUri().getPath().replaceFirst("/", ""))));
// Clean up the filter file.
new File(filterFilePath).delete();
}
// Verifies if the initial rounds are snapshots are created for source and target database.
private void validateInitialSnapshotsCreated(String location) throws Exception {
Path locationPath = new Path(location);
DistributedFileSystem dfs = (DistributedFileSystem) locationPath.getFileSystem(conf);
// Check whether the source location got snapshottable
assertTrue("Snapshot not enabled for the source location", dfs.getFileStatus(locationPath).isSnapshotEnabled());
// Check whether the initial snapshot got created in the source db location.
assertNotNull(dfs.getFileStatus(new Path(locationPath, ".snapshot/" + secondSnapshot(primaryDbName.toLowerCase()))));
// Verify Snapshots are created in target.
Path locationPathTarget = new Path(REPLICA_EXTERNAL_BASE, locationPath.toUri().getPath().replaceFirst("/", ""));
DistributedFileSystem dfsTarget = (DistributedFileSystem) locationPathTarget.getFileSystem(conf);
assertTrue("Snapshot not enabled for the target location",
dfsTarget.getFileStatus(locationPathTarget).isSnapshotEnabled());
// Check whether the snapshot got created in the target location.
assertNotNull(dfsTarget
.getFileStatus(new Path(locationPathTarget, ".snapshot" + "/" + firstSnapshot(primaryDbName.toLowerCase()))));
}
public static void deleteReplRelatedSnapshots(FileSystem fs, Path path, HiveConf conf) {
try {
FileStatus[] listing = fs.listStatus(new Path(path, ".snapshot"));
for (FileStatus elem : listing) {
if (elem.getPath().getName().contains(OLD_SNAPSHOT) || elem.getPath().getName().contains(NEW_SNAPSHOT)) {
deleteSnapshotIfExists((DistributedFileSystem) fs, path, elem.getPath().getName(), conf);
}
}
} catch (Exception e) {
// Ignore
}
}
}