blob: 9eefd04e7f9a89944e08fab87f29dc0de49b9fd6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.parse;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.Catalog;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.repl.ReplAck;
import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.StringAppender;
import org.apache.hadoop.hive.ql.parse.repl.dump.EventsDumpMetadata;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.FileWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Base64;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK_PATHS;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
public class TestReplicationScenariosExternalTables extends BaseReplicationAcrossInstances {
String extraPrimaryDb;
@BeforeClass
public static void classLevelSetup() throws Exception {
HashMap<String, String> overrides = new HashMap<>();
overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
GzipJSONMessageEncoder.class.getCanonicalName());
overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true");
overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
UserGroupInformation.getCurrentUser().getUserName());
overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "false");
overrides.put(HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS.varname, "false");
internalBeforeClassSetup(overrides, TestReplicationScenarios.class);
}
@Before
public void setup() throws Throwable {
super.setup();
extraPrimaryDb = "extra_" + primaryDbName;
}
@After
public void tearDown() throws Throwable {
primary.run("drop database if exists " + extraPrimaryDb + " cascade");
super.tearDown();
}
@Test
public void replicationWithoutExternalTables() throws Throwable {
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(false);
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create external table t1 (id int)")
.run("insert into table t1 values (1)")
.run("insert into table t1 values (2)")
.run("create external table t2 (place string) partitioned by (country string)")
.run("insert into table t2 partition(country='india') values ('bangalore')")
.run("insert into table t2 partition(country='us') values ('austin')")
.run("insert into table t2 partition(country='france') values ('paris')")
.dump(primaryDbName, withClause);
// the _file_list_external only should be created if external tables are to be replicated not otherwise
Path replHiveBasePath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
assertFalse(primary.miniDFSCluster.getFileSystem()
.exists(new Path(replHiveBasePath, EximUtil.FILE_LIST_EXTERNAL)));
replica.load(replicatedDbName, primaryDbName, withClause)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyFailure(new String[] { "t1" })
.run("show tables like 't2'")
.verifyFailure(new String[] { "t2" })
.verifyReplTargetProperty(replicatedDbName);
tuple = primary.run("use " + primaryDbName)
.run("create external table t3 (id int)")
.run("insert into table t3 values (10)")
.run("insert into table t3 values (20)")
.dump(primaryDbName, withClause);
// _file_list_external only should be created if external tables are to be replicated not otherwise
replHiveBasePath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
assertFalse(primary.miniDFSCluster.getFileSystem()
.exists(new Path(replHiveBasePath, EximUtil.FILE_LIST_EXTERNAL)));
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables like 't3'")
.verifyFailure(new String[] { "t3" })
.verifyReplTargetProperty(replicatedDbName);
}
@Test
public void externalTableReplicationWithDefaultPaths() throws Throwable {
//creates external tables with partitions
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create external table t1 (id int)")
.run("insert into table t1 values (1)")
.run("insert into table t1 values (2)")
.run("create external table t2 (place string) partitioned by (country string)")
.run("insert into table t2 partition(country='india') values ('bangalore')")
.run("insert into table t2 partition(country='us') values ('austin')")
.run("insert into table t2 partition(country='france') values ('paris')")
.dump(primaryDbName);
// verify that the external table filelist is written correctly for bootstrap
ReplicationTestUtils.assertExternalFileList(Arrays.asList("t1", "t2"), tuple.dumpLocation, primary);
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
.run("show tables like 't2'")
.verifyResult("t2")
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("select country from t2 where country = 'us'")
.verifyResult("us")
.run("select country from t2 where country = 'france'")
.verifyResult("france")
.run("show partitions t2").verifyResults(new String[] {"country=france", "country=india", "country=us"});
String hiveDumpLocation = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
// Ckpt should be set on bootstrapped db.
replica.verifyIfCkptSet(replicatedDbName, hiveDumpLocation);
assertTablePartitionLocation(primaryDbName + ".t1", replicatedDbName + ".t1");
assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2");
tuple = primary.run("use " + primaryDbName)
.run("create external table t3 (id int)")
.run("insert into table t3 values (10)")
.run("create external table t4 as select id from t3")
.dump(primaryDbName);
// verify that the external table list is written correctly for incremental
ReplicationTestUtils.assertExternalFileList(Arrays.asList("t1", "t2", "t3", "t4"), tuple.dumpLocation, primary);
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 't3'")
.verifyResult("t3")
.run("select id from t3")
.verifyResult("10")
.run("select id from t4")
.verifyResult("10");
assertTablePartitionLocation(primaryDbName + ".t3", replicatedDbName + ".t3");
tuple = primary.run("use " + primaryDbName)
.run("drop table t1")
.dump(primaryDbName);
// verify that the external table list is written correctly for incremental
ReplicationTestUtils.assertExternalFileList(Arrays.asList("t2", "t3", "t4"), tuple.dumpLocation, primary);
}
@Test
public void externalTableReplicationWithDefaultPathsLazyCopy() throws Throwable {
List<String> lazyCopyClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='true'");
//creates external tables with partitions
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create external table t1 (id int)")
.run("insert into table t1 values (1)")
.run("insert into table t1 values (2)")
.run("create external table t2 (place string) partitioned by (country string)")
.run("insert into table t2 partition(country='india') values ('bangalore')")
.run("insert into table t2 partition(country='us') values ('austin')")
.run("insert into table t2 partition(country='france') values ('paris')")
.dump(primaryDbName, lazyCopyClause);
// verify that the external table list is written correctly for bootstrap
ReplicationTestUtils.assertExternalFileList(Arrays.asList("t1", "t2"), tuple.dumpLocation, primary);
replica.load(replicatedDbName, primaryDbName, lazyCopyClause)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
.run("show tables like 't2'")
.verifyResult("t2")
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("select country from t2 where country = 'us'")
.verifyResult("us")
.run("select country from t2 where country = 'france'")
.verifyResult("france")
.run("show partitions t2").verifyResults(new String[] {"country=france", "country=india", "country=us"});
String hiveDumpLocation = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
// Ckpt should be set on bootstrapped db.
replica.verifyIfCkptSet(replicatedDbName, hiveDumpLocation);
assertTablePartitionLocation(primaryDbName + ".t1", replicatedDbName + ".t1");
assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2");
tuple = primary.run("use " + primaryDbName)
.run("create external table t3 (id int)")
.run("insert into table t3 values (10)")
.run("create external table t4 as select id from t3")
.dump(primaryDbName, lazyCopyClause);
// verify that the external table list is written correctly for incremental
ReplicationTestUtils.assertExternalFileList(Arrays.asList("t1", "t2", "t3", "t4"), tuple.dumpLocation, primary);
replica.load(replicatedDbName, primaryDbName, lazyCopyClause)
.run("use " + replicatedDbName)
.run("show tables like 't3'")
.verifyResult("t3")
.run("select id from t3")
.verifyResult("10")
.run("select id from t4")
.verifyResult("10");
}
@Test
public void externalTableReplicationWithCustomPathsLazyCopy() throws Throwable {
Path externalTableLocation =
new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + "a/");
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.mkdirs(externalTableLocation, new FsPermission("777"));
List<String> withClause = Arrays.asList(
"'distcp.options.update'=''",
"'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='true'"
);
primary.run("use " + primaryDbName)
.run("create external table a (i int, j int) "
+ "row format delimited fields terminated by ',' "
+ "location '" + externalTableLocation.toUri() + "'")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables like 'a'")
.verifyResults(Collections.singletonList("a"))
.run("select * From a").verifyResults(Collections.emptyList());
assertTablePartitionLocation(primaryDbName + ".a", replicatedDbName + ".a");
//externally add data to location
try (FSDataOutputStream outputStream =
fs.create(new Path(externalTableLocation, "file1.txt"))) {
outputStream.write("1,2\n".getBytes());
outputStream.write("13,21\n".getBytes());
}
WarehouseInstance.Tuple incrementalTuple = primary.run("create table b (i int)")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("select i From a")
.verifyResults(new String[] { "1", "13" })
.run("select j from a")
.verifyResults(new String[] { "2", "21" });
// alter table location to something new.
externalTableLocation =
new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/new_location/a/");
primary.run("use " + primaryDbName)
.run("alter table a set location '" + externalTableLocation + "'")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("select i From a")
.verifyResults(Collections.emptyList());
assertTablePartitionLocation(primaryDbName + ".a", replicatedDbName + ".a");
}
@Test
public void testExternalTableLocationACLPreserved() throws Throwable {
// Create data file with data for external table.
Path externalTableLocation = new Path(
"/" + testName.getMethodName() + "/" + primaryDbName + "/" + "a/");
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.mkdirs(externalTableLocation, new FsPermission("777"));
fs.setOwner(externalTableLocation,"user1","group1");
Path externalFileLoc = new Path(externalTableLocation, "file1.txt");
try (FSDataOutputStream outputStream = fs.create(externalFileLoc)) {
outputStream.write("1,2\n".getBytes());
outputStream.write("13,21\n".getBytes());
}
// Set some ACL's on the table directory and the data file.
List<AclEntry> aclEntries = new ArrayList<>();
AclEntry aeUser =
new AclEntry.Builder().setName("user").setScope(AclEntryScope.ACCESS)
.setType(AclEntryType.USER).setPermission(FsAction.ALL).build();
AclEntry aeGroup =
new AclEntry.Builder().setName("group").setScope(AclEntryScope.ACCESS)
.setType(AclEntryType.GROUP).setPermission(FsAction.ALL).build();
AclEntry aeOther = new AclEntry.Builder().setScope(AclEntryScope.ACCESS)
.setType(AclEntryType.OTHER).setPermission(FsAction.ALL).build();
aclEntries.add(aeUser);
aclEntries.add(aeGroup);
aclEntries.add(aeOther);
fs.modifyAclEntries(externalTableLocation, aclEntries);
fs.modifyAclEntries(externalFileLoc, aclEntries);
// Run bootstrap without distcp options to preserve options.
List<String> withClause = Arrays.asList("'distcp.options.update'=''",
"'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='true'");
primary.run("use " + primaryDbName).run(
"create external table a (i int, j int) "
+ "row format delimited fields terminated by ',' " + "location '"
+ externalTableLocation.toUri() + "'")
.dump(primaryDbName, withClause);
// Verify load is success and has the appropriate data.
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName).run("select i From a")
.verifyResults(new String[] {"1", "13"}).run("select j from a")
.verifyResults(new String[] {"2", "21"});
// Verify the attributes of the destination table directory and data file are
// not same as that of source.
Hive hiveForReplica = Hive.get(replica.hiveConf);
org.apache.hadoop.hive.ql.metadata.Table replicaTable =
hiveForReplica.getTable(replicatedDbName + ".a");
Path dataLocation = replicaTable.getDataLocation();
assertNotEquals("ACL entries are same for the data file.",
fs.getAclStatus(externalFileLoc).getEntries().size(),
fs.getAclStatus(new Path(dataLocation, "file1.txt")).getEntries()
.size());
assertNotEquals("ACL entries are same for the table directory.",
fs.getAclStatus(externalTableLocation).getEntries().size(),
fs.getAclStatus(dataLocation).getEntries().size());
assertNotEquals(fs.getFileStatus(externalTableLocation).getOwner(), fs.getFileStatus(dataLocation).getOwner());
assertNotEquals(fs.getFileStatus(externalTableLocation).getGroup(), fs.getFileStatus(dataLocation).getGroup());
// Dump & load with preserve attributes set.
withClause = Arrays
.asList("'distcp.options.update'=''", "'distcp.options.pugpa'=''",
"'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname
+ "'='true'");
primary.run("use " + primaryDbName).dump(primaryDbName, withClause);
// Verify load is success and has the appropriate data.
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName).run("select i From a")
.verifyResults(new String[] {"1", "13"}).run("select j from a")
.verifyResults(new String[] {"2", "21"});
// Verify the ACL's of the destination table directory and data file are
// same as that of source.
assertEquals("ACL entries are not same for the data file.",
fs.getAclStatus(externalFileLoc).getEntries().size(),
fs.getAclStatus(new Path(dataLocation, "file1.txt")).getEntries()
.size());
assertEquals("ACL entries are not same for the table directory.",
fs.getAclStatus(externalTableLocation).getEntries().size(),
fs.getAclStatus(dataLocation).getEntries().size());
assertEquals(fs.getFileStatus(externalTableLocation).getOwner(), fs.getFileStatus(dataLocation).getOwner());
assertEquals(fs.getFileStatus(externalTableLocation).getGroup(), fs.getFileStatus(dataLocation).getGroup());
}
/**
* @param sourceTableName -- Provide the fully qualified table name
* @param replicaTableName -- Provide the fully qualified table name
*/
private void assertTablePartitionLocation(String sourceTableName, String replicaTableName)
throws HiveException {
Hive hiveForPrimary = Hive.get(primary.hiveConf);
org.apache.hadoop.hive.ql.metadata.Table sourceTable = hiveForPrimary.getTable(sourceTableName);
Path sourceLocation = sourceTable.getDataLocation();
Hive hiveForReplica = Hive.get(replica.hiveConf);
org.apache.hadoop.hive.ql.metadata.Table replicaTable = hiveForReplica.getTable(replicaTableName);
Path dataLocation = replicaTable.getDataLocation();
assertEquals(REPLICA_EXTERNAL_BASE + sourceLocation.toUri().getPath(),
dataLocation.toUri().getPath());
if (sourceTable.isPartitioned()) {
Set<Partition> sourcePartitions = hiveForPrimary.getAllPartitionsOf(sourceTable);
Set<Partition> replicaPartitions = hiveForReplica.getAllPartitionsOf(replicaTable);
assertEquals(sourcePartitions.size(), replicaPartitions.size());
List<String> expectedPaths =
sourcePartitions.stream()
.map(p -> REPLICA_EXTERNAL_BASE + p.getDataLocation().toUri().getPath())
.collect(Collectors.toList());
List<String> actualPaths =
replicaPartitions.stream()
.map(p -> p.getDataLocation().toUri().getPath())
.collect(Collectors.toList());
assertTrue(expectedPaths.containsAll(actualPaths));
}
}
@Test
public void externalTableReplicationWithCustomPaths() throws Throwable {
Path externalTableLocation =
new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + "a/");
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.mkdirs(externalTableLocation, new FsPermission("777"));
// Create base directory but use HDFS path without schema or authority details.
// Hive should pick up the local cluster's HDFS schema/authority.
List<String> withClause = Arrays.asList(
"'distcp.options.update'=''"
);
primary.run("use " + primaryDbName)
.run("create external table a (i int, j int) "
+ "row format delimited fields terminated by ',' "
+ "location '" + externalTableLocation.toUri() + "'")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables like 'a'")
.verifyResults(Collections.singletonList("a"))
.run("select * From a").verifyResults(Collections.emptyList());
assertTablePartitionLocation(primaryDbName + ".a", replicatedDbName + ".a");
//externally add data to location
try (FSDataOutputStream outputStream =
fs.create(new Path(externalTableLocation, "file1.txt"))) {
outputStream.write("1,2\n".getBytes());
outputStream.write("13,21\n".getBytes());
}
WarehouseInstance.Tuple incrementalTuple = primary.run("create table b (i int)")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("select i From a")
.verifyResults(new String[] { "1", "13" })
.run("select j from a")
.verifyResults(new String[] { "2", "21" });
// alter table location to something new.
externalTableLocation =
new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/new_location/a/");
primary.run("use " + primaryDbName)
.run("alter table a set location '" + externalTableLocation + "'")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("select i From a")
.verifyResults(Collections.emptyList());
assertTablePartitionLocation(primaryDbName + ".a", replicatedDbName + ".a");
}
@Test
public void externalTableWithPartitions() throws Throwable {
Path externalTableLocation =
new Path("/" + testName.getMethodName() + "/t2/");
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.mkdirs(externalTableLocation, new FsPermission("777"));
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create external table t2 (place string) partitioned by (country string) row format "
+ "delimited fields terminated by ',' location '" + externalTableLocation.toString()
+ "'")
.run("insert into t2 partition(country='india') values ('bangalore')")
.dump(primaryDbName, withClause);
ReplicationTestUtils.assertExternalFileList(Collections.singletonList("t2"), tuple.dumpLocation, primary);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables like 't2'")
.verifyResults(new String[] { "t2" })
.run("select place from t2")
.verifyResults(new String[] { "bangalore" })
.verifyReplTargetProperty(replicatedDbName);
assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2");
// add new data externally, to a partition, but under the table level top directory
Path partitionDir = new Path(externalTableLocation, "country=india");
try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file.txt"))) {
outputStream.write("pune\n".getBytes());
outputStream.write("mumbai\n".getBytes());
}
tuple = primary.run("use " + primaryDbName)
.run("insert into t2 partition(country='australia') values ('sydney')")
.dump(primaryDbName, withClause);
ReplicationTestUtils.assertExternalFileList(Collections.singletonList("t2"), tuple.dumpLocation, primary);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("select distinct(country) from t2")
.verifyResults(new String[] { "india", "australia" })
.run("select place from t2 where country='india'")
.verifyResults(new String[] { "bangalore", "pune", "mumbai" })
.run("select place from t2 where country='australia'")
.verifyResults(new String[] { "sydney" })
.run("show partitions t2")
.verifyResults(new String[] {"country=australia", "country=india"})
.verifyReplTargetProperty(replicatedDbName);
Path customPartitionLocation =
new Path("/" + testName.getMethodName() + "/partition_data/t2/country=france");
fs.mkdirs(externalTableLocation, new FsPermission("777"));
// add new partitions to the table, at an external location than the table level directory
try (FSDataOutputStream outputStream = fs
.create(new Path(customPartitionLocation, "file.txt"))) {
outputStream.write("paris".getBytes());
}
primary.run("use " + primaryDbName)
.run("ALTER TABLE t2 ADD PARTITION (country='france') LOCATION '" + customPartitionLocation
.toString() + "'")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("select place from t2 where country='france'")
.verifyResults(new String[] { "paris" })
.run("show partitions t2")
.verifyResults(new String[] {"country=australia", "country=france", "country=india"})
.verifyReplTargetProperty(replicatedDbName);
// change the location of the partition via alter command
String tmpLocation = "/tmp/" + System.nanoTime();
primary.miniDFSCluster.getFileSystem().mkdirs(new Path(tmpLocation), new FsPermission("777"));
primary.run("use " + primaryDbName)
.run("alter table t2 partition (country='france') set location '" + tmpLocation + "'")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("select place from t2 where country='france'")
.verifyResults(new String[] {})
.verifyReplTargetProperty(replicatedDbName);
// Changing location of one of the partitions shouldn't result in changing location of other
// partitions as well as that of the table.
assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2");
// Changing location of the external table, should result in changes to the location of
// partition residing within the table location and not the partitions located outside.
String tmpLocation2 = "/tmp/" + System.nanoTime() + "_2";
primary.miniDFSCluster.getFileSystem().mkdirs(new Path(tmpLocation2), new FsPermission("777"));
primary.run("use " + primaryDbName)
.run("insert into table t2 partition(country='france') values ('lyon')")
.run("alter table t2 set location '" + tmpLocation2 + "'")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause);
assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2");
}
@Test
public void externalTableWithPartitionsInBatch() throws Throwable {
Path externalTableLocation =
new Path("/" + testName.getMethodName() + "/t2/");
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.mkdirs(externalTableLocation, new FsPermission("777"));
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'" + HiveConf.ConfVars.REPL_LOAD_PARTITIONS_BATCH_SIZE.varname + "'='" + 1 + "'");
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create external table t2 (place string) partitioned by (country string) row format "
+ "delimited fields terminated by ',' location '" + externalTableLocation.toString()
+ "'")
.run("insert into t2 partition(country='india') values ('bangalore')")
.run("insert into t2 partition(country='france') values ('paris')")
.run("insert into t2 partition(country='australia') values ('sydney')")
.dump(primaryDbName, withClause);
ReplicationTestUtils.assertExternalFileList(Collections.singletonList("t2"), tuple.dumpLocation, primary);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables like 't2'")
.verifyResults(new String[] { "t2" })
.run("select distinct(country) from t2")
.verifyResults(new String[] { "india", "france", "australia" })
.run("select place from t2")
.verifyResults(new String[] { "bangalore", "paris", "sydney" })
.verifyReplTargetProperty(replicatedDbName);
assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2");
}
@Test
public void externalTableIncrementalCheckpointing() throws Throwable {
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create external table t1 (id int)")
.run("insert into table t1 values (1)")
.run("insert into table t1 values (2)")
.run("create external table t2 (id int)")
.run("insert into table t2 values (3)")
.run("insert into table t2 values (4)")
.dump(primaryDbName, withClause);
ReplicationTestUtils.assertExternalFileList(Arrays.asList(new String[]{"t1", "t2"}), tuple.dumpLocation, primary);
replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("use " + replicatedDbName)
.run("select * from t1")
.verifyResults(new String[] {"1", "2"})
.run("select * from t2")
.verifyResults(new String[] {"3", "4"})
.verifyReplTargetProperty(replicatedDbName);
ReplDumpWork.testDeletePreviousDumpMetaPath(true);
withClause = ReplicationTestUtils.externalTableWithClause(new ArrayList<>(), true, true);
WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName)
.run("drop table t1")
.run("insert into table t2 values (5)")
.run("insert into table t2 values (6)")
.run("create external table t3 (id int)")
.run("insert into table t3 values (8)")
.dump(primaryDbName, withClause);
// verify that the external table list is written correctly for incremental
ReplicationTestUtils.assertExternalFileList(Arrays.asList("t2", "t3"), incrementalDump1.dumpLocation, primary);
FileSystem fs = primary.miniDFSCluster.getFileSystem();
Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
Path ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
Path ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString());
assertTrue(fs.exists(ackFile));
assertTrue(fs.exists(ackLastEventID));
Path bootstrapDir = new Path(hiveDumpDir, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME);
Path metaDir = new Path(bootstrapDir, EximUtil.METADATA_PATH_NAME);
Path dataDir = new Path(bootstrapDir, EximUtil.DATA_PATH_NAME);
assertFalse(fs.exists(dataDir));
long oldMetadirModTime = fs.getFileStatus(metaDir).getModificationTime();
fs.delete(ackFile, false);
EventsDumpMetadata eventsDumpMetadata = EventsDumpMetadata.deserialize(ackLastEventID, conf);
fs.delete(ackLastEventID, false);
//delete all the event folders except first event
long startEvent = -1;
long endEvent = Long.valueOf(incrementalDump1.lastReplicationId);
int deletedEventsCount = 0;
for (long eventDir = Long.valueOf(tuple.lastReplicationId) + 1; eventDir <= endEvent; eventDir++) {
Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventDir));
if (fs.exists(eventRoot)) {
if (startEvent == -1){
startEvent = eventDir;
} else {
deletedEventsCount++;
fs.delete(eventRoot, true);
}
}
}
Path startEventRoot = new Path(hiveDumpDir, String.valueOf(startEvent));
Map<Path, Long> firstEventModTimeMap = new HashMap<>();
for (FileStatus fileStatus: fs.listStatus(startEventRoot)) {
firstEventModTimeMap.put(fileStatus.getPath(), fileStatus.getModificationTime());
}
assertTrue(endEvent - startEvent > 1);
eventsDumpMetadata.setEventsDumpedCount(eventsDumpMetadata.getEventsDumpedCount() - deletedEventsCount);
Utils.writeOutput(eventsDumpMetadata.serialize(), ackLastEventID, primary.hiveConf);
WarehouseInstance.Tuple incrementalDump2 = primary.dump(primaryDbName, withClause);
assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation);
assertTrue(fs.getFileStatus(metaDir).getModificationTime() > oldMetadirModTime);
ReplicationTestUtils.assertExternalFileList(Arrays.asList("t2", "t3"), incrementalDump2.dumpLocation, primary);
//first event meta is not rewritten
for (Map.Entry<Path, Long> entry: firstEventModTimeMap.entrySet()) {
assertEquals((long)entry.getValue(), fs.getFileStatus(entry.getKey()).getModificationTime());
}
replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(incrementalDump2.lastReplicationId)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyFailure(new String[] {"t1"})
.run("select * from t2")
.verifyResults(new String[] {"3", "4", "5", "6"})
.run("select * from t3")
.verifyResult("8")
.verifyReplTargetProperty(replicatedDbName);
}
@Test
public void externalTableIncrementalReplication() throws Throwable {
WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName);
Path externalTableLocation =
new Path("/" + testName.getMethodName() + "/t1/");
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.mkdirs(externalTableLocation, new FsPermission("777"));
tuple = primary.run("use " + primaryDbName)
.run("create external table t1 (place string) partitioned by (country string) row format "
+ "delimited fields terminated by ',' location '" + externalTableLocation.toString()
+ "'")
.run("alter table t1 add partition(country='india')")
.run("alter table t1 add partition(country='us')")
.dump(primaryDbName);
ReplicationTestUtils.assertExternalFileList(Collections.singletonList("t1"), tuple.dumpLocation, primary);
// Add new data externally, to a partition, but under the partition level top directory
// Also, it is added after dumping the events so data should not be seen at target after REPL LOAD.
Path partitionDir = new Path(externalTableLocation, "country=india");
try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file.txt"))) {
outputStream.write("pune\n".getBytes());
outputStream.write("mumbai\n".getBytes());
}
try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file1.txt"))) {
outputStream.write("bangalore\n".getBytes());
}
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
.run("show partitions t1")
.verifyResults(new String[] { "country=india", "country=us" })
.run("select place from t1 order by place")
.verifyResults(new String[] {})
.verifyReplTargetProperty(replicatedDbName);
// The Data should be seen after next dump-and-load cycle.
tuple = primary.run("use " + primaryDbName)
.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
.run("show partitions t1")
.verifyResults(new String[] {"country=india", "country=us"})
.run("select place from t1 order by place")
.verifyResults(new String[] {"bangalore", "mumbai", "pune"})
.verifyReplTargetProperty(replicatedDbName);
// Delete one of the file and update another one.
fs.delete(new Path(partitionDir, "file.txt"), true);
fs.delete(new Path(partitionDir, "file1.txt"), true);
try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file1.txt"))) {
outputStream.write("chennai\n".getBytes());
}
// Repl load with zero events but external tables file list should present.
tuple = primary.dump(primaryDbName);
ReplicationTestUtils.assertExternalFileList(Collections.singletonList("t1"), tuple.dumpLocation, primary);
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
.run("show partitions t1")
.verifyResults(new String[] { "country=india", "country=us" })
.run("select place from t1 order by place")
.verifyResults(new String[] { "chennai" })
.verifyReplTargetProperty(replicatedDbName);
Hive hive = Hive.get(replica.getConf());
Set<Partition> partitions =
hive.getAllPartitionsOf(hive.getTable(replicatedDbName + ".t1"));
List<String> paths = partitions.stream().map(p -> p.getDataLocation().toUri().getPath())
.collect(Collectors.toList());
tuple = primary
.run("alter table t1 drop partition (country='india')")
.run("alter table t1 drop partition (country='us')")
.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName)
.run("select * From t1")
.verifyResults(new String[] {})
.verifyReplTargetProperty(replicatedDbName);
for (String path : paths) {
assertTrue(replica.miniDFSCluster.getFileSystem().exists(new Path(path)));
}
}
@Test
public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable {
List<String> dumpWithClause = Collections.singletonList(
"'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'"
);
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create external table t1 (id int)")
.run("insert into table t1 values (1)")
.run("insert into table t1 values (2)")
.run("create external table t2 (place string) partitioned by (country string)")
.run("insert into table t2 partition(country='india') values ('bangalore')")
.run("insert into table t2 partition(country='us') values ('austin')")
.run("insert into table t2 partition(country='france') values ('paris')")
.dump(primaryDbName, dumpWithClause);
// the _file_list_external only should be created if external tables are to be replicated not otherwise
Path replHiveBasePath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
assertFalse(primary.miniDFSCluster.getFileSystem()
.exists(new Path(replHiveBasePath, EximUtil.FILE_LIST_EXTERNAL)));
replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyFailure(new String[] {"t1" })
.run("show tables like 't2'")
.verifyFailure(new String[] {"t2" })
.verifyReplTargetProperty(replicatedDbName);
dumpWithClause = ReplicationTestUtils.externalTableWithClause(new ArrayList<>(), true, true);
tuple = primary.run("use " + primaryDbName)
.run("drop table t1")
.run("create external table t3 (id int)")
.run("insert into table t3 values (10)")
.run("insert into table t3 values (20)")
.run("create table t4 as select * from t3")
.dump(primaryDbName, dumpWithClause);
// the _file_list_external should be created as external tables are to be replicated.
Path hivePath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
assertTrue(primary.miniDFSCluster.getFileSystem()
.exists(new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL)));
// verify that the external table list is written correctly for incremental
ReplicationTestUtils.assertExternalFileList(Arrays.asList("t2", "t3"), tuple.dumpLocation, primary);
// _bootstrap directory should be created as bootstrap enabled on external tables.
String hiveDumpLocation = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
Path dumpPath = new Path(hiveDumpLocation, INC_BOOTSTRAP_ROOT_DIR_NAME);
assertTrue(primary.miniDFSCluster.getFileSystem().exists(dumpPath));
// _bootstrap/<db_name>/t2
// _bootstrap/<db_name>/t3
Path dbPath = new Path(dumpPath, EximUtil.METADATA_PATH_NAME + File.separator + primaryDbName);
Path tblPath = new Path(dbPath, "t2");
assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath));
tblPath = new Path(dbPath, "t3");
assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath));
replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyFailure(new String[] {"t1" })
.run("show tables like 't2'")
.verifyResult("t2")
.run("show tables like 't3'")
.verifyResult("t3")
.run("show tables like 't4'")
.verifyResult("t4")
.verifyReplTargetProperty(replicatedDbName);
// Ckpt should be set on bootstrapped tables.
replica.verifyIfCkptSetForTables(replicatedDbName, Arrays.asList("t2", "t3"), hiveDumpLocation);
// Drop source tables to see if target points to correct data or not after bootstrap load.
primary.run("use " + primaryDbName)
.run("drop table t2")
.run("drop table t3");
// Create table event for t4 should be applied along with bootstrapping of t2 and t3
replica.run("use " + replicatedDbName)
.run("select place from t2 where country = 'us'")
.verifyResult("austin")
.run("select place from t2 where country = 'france'")
.verifyResult("paris")
.run("select id from t3 order by id")
.verifyResults(Arrays.asList("10", "20"))
.run("select id from t4 order by id")
.verifyResults(Arrays.asList("10", "20"))
.verifyReplTargetProperty(replicatedDbName);
}
@Test
public void retryBootstrapExternalTablesFromDifferentDump() throws Throwable {
List<String> loadWithClause = ReplicationTestUtils.includeExternalTableClause(true);
List<String> dumpWithClause = ReplicationTestUtils.includeExternalTableClause(false);
WarehouseInstance.Tuple tupleBootstrapWithoutExternal = primary
.run("use " + primaryDbName)
.run("create external table t1 (id int)")
.run("insert into table t1 values (1)")
.run("create external table t2 (place string) partitioned by (country string)")
.run("insert into table t2 partition(country='india') values ('bangalore')")
.run("insert into table t2 partition(country='us') values ('austin')")
.run("create table t3 as select * from t1")
.dump(primaryDbName, dumpWithClause);
replica.load(replicatedDbName, primaryDbName, loadWithClause)
.status(replicatedDbName)
.verifyResult(tupleBootstrapWithoutExternal.lastReplicationId)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResult("t3")
.run("select id from t3")
.verifyResult("1")
.verifyReplTargetProperty(replicatedDbName);
dumpWithClause = ReplicationTestUtils.externalTableWithClause(new ArrayList<>(), true, true);
primary.run("use " + primaryDbName)
.run("drop table t1")
.run("create external table t4 (id int)")
.run("insert into table t4 values (10)")
.run("create table t5 as select * from t4")
.dump(primaryDbName, dumpWithClause);
// Fail setting ckpt property for table t4 but success for t2.
BehaviourInjection<CallerArguments, Boolean> callerVerifier
= new BehaviourInjection<CallerArguments, Boolean>() {
@Nullable
@Override
public Boolean apply(@Nullable CallerArguments args) {
if (args.tblName.equalsIgnoreCase("t4") && args.dbName.equalsIgnoreCase(replicatedDbName)) {
injectionPathCalled = true;
LOG.warn("Verifier - DB : " + args.dbName + " TABLE : " + args.tblName);
return false;
}
return true;
}
};
// Fail repl load before the ckpt property is set for t4 and after it is set for t2.
// In the retry, these half baked tables should be dropped and bootstrap should be successful.
InjectableBehaviourObjectStore.setAlterTableModifier(callerVerifier);
try {
replica.loadFailure(replicatedDbName, primaryDbName, loadWithClause);
callerVerifier.assertInjectionsPerformed(true, false);
} finally {
InjectableBehaviourObjectStore.resetAlterTableModifier();
}
Path baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR));
Path nonRecoverablePath = TestReplicationScenarios.getNonRecoverablePath(baseDumpDir, primaryDbName, primary.hiveConf);
if(nonRecoverablePath != null){
baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true);
}
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyFailure(new String[]{"t1"})
.run("show tables like 't2'")
.verifyResult("t2")
.run("select country from t2 order by country")
.verifyResults(new String[] {"india", "us"})
.run("select id from t4")
.verifyResults(Arrays.asList("10"))
.run("select id from t5")
.verifyResult("10")
.verifyReplTargetProperty(replicatedDbName);
// Insert into existing external table and then Drop it, add another managed table with same name
// and dump another bootstrap dump for external tables.
dumpWithClause = ReplicationTestUtils.includeExternalTableClause(true);
primary.run("use " + primaryDbName)
.run("insert into table t2 partition(country='india') values ('chennai')")
.run("drop table t2")
.run("create table t2 as select * from t4")
.run("insert into table t4 values (20)")
.dump(primaryDbName, dumpWithClause);
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyFailure(new String[]{"t1"})
.run("select id from t2")
.verifyResult("10")
.run("select id from t4")
.verifyResults(Arrays.asList("10", "20"))
.run("select id from t5")
.verifyResult("10")
.verifyReplTargetProperty(replicatedDbName);
}
@Test
public void testExternalTableDataPath() throws Exception {
HiveConf conf = primary.getConf();
Path basePath = new Path("/");
Path sourcePath = new Path("/abc/xyz");
Path dataPath = ReplExternalTables.externalTableDataPath(conf, basePath, sourcePath);
assertTrue(dataPath.toUri().getPath().equalsIgnoreCase("/abc/xyz"));
basePath = new Path("/tmp");
dataPath = ReplExternalTables.externalTableDataPath(conf, basePath, sourcePath);
assertTrue(dataPath.toUri().getPath().equalsIgnoreCase("/tmp/abc/xyz"));
basePath = new Path("/tmp/");
dataPath = ReplExternalTables.externalTableDataPath(conf, basePath, sourcePath);
assertTrue(dataPath.toUri().getPath().equalsIgnoreCase("/tmp/abc/xyz"));
basePath = new Path("/tmp/tmp1//");
dataPath = ReplExternalTables.externalTableDataPath(conf, basePath, sourcePath);
assertTrue(dataPath.toUri().getPath().equalsIgnoreCase("/tmp/tmp1/abc/xyz"));
}
@Test
public void testExternalTablesIncReplicationWithConcurrentDropTable() throws Throwable {
List<String> dumpWithClause = ReplicationTestUtils.externalTableWithClause(new ArrayList<>(), null, true);
List<String> loadWithClause = ReplicationTestUtils.includeExternalTableClause(true);
WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName)
.run("create external table t1 (id int)")
.run("insert into table t1 values (1)")
.dump(primaryDbName, dumpWithClause);
replica.load(replicatedDbName, primaryDbName, loadWithClause);
// Insert a row into "t1" and create another external table using data from "t1".
primary.run("use " + primaryDbName)
.run("insert into table t1 values (2)")
.run("create external table t2 as select * from t1");
// Inject a behavior so that getTable returns null for table "t1". This ensures the table is
// skipped for data files listing.
BehaviourInjection<Table, Table> tableNuller = new BehaviourInjection<Table, Table>() {
@Nullable
@Override
public Table apply(@Nullable Table table) {
LOG.info("Performing injection on table " + table.getTableName());
if (table.getTableName().equalsIgnoreCase("t1")){
injectionPathCalled = true;
return null;
} else {
nonInjectedPathCalled = true;
return table;
}
}
};
InjectableBehaviourObjectStore.setGetTableBehaviour(tableNuller);
WarehouseInstance.Tuple tupleInc;
try {
// The t1 table will be skipped from data location listing.
tupleInc = primary.dump(primaryDbName, dumpWithClause);
tableNuller.assertInjectionsPerformed(true, true);
} finally {
InjectableBehaviourObjectStore.resetGetTableBehaviour(); // reset the behaviour
}
// Only table t2 should exist in the data location list file.
ReplicationTestUtils.assertExternalFileList(Collections.singletonList("t2"), tupleInc.dumpLocation, primary);
// The newly inserted data "2" should be missing in table "t1". But, table t2 should exist and have
// inserted data.
replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("select id from t1 order by id")
.verifyResult("1")
.run("select id from t2 order by id")
.verifyResults(Arrays.asList("1", "2"));
}
@Test
public void testIncrementalDumpEmptyDumpDirectory() throws Throwable {
List<String> loadWithClause = ReplicationTestUtils.includeExternalTableClause(true);
List<String> dumpWithClause = ReplicationTestUtils.includeExternalTableClause(true);
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create external table t1 (id int)")
.run("insert into table t1 values (1)")
.run("insert into table t1 values (2)")
.dump(primaryDbName, dumpWithClause);
replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(tuple.lastReplicationId);
// This looks like an empty dump but it has the ALTER TABLE event created by the previous
// dump. We need it here so that the next dump won't have any events.
WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName, dumpWithClause);
replica.load(replicatedDbName, primaryDbName, loadWithClause)
.status(replicatedDbName)
.verifyResult(incTuple.lastReplicationId);
// create events for some other database and then dump the primaryDbName to dump an empty directory.
primary.run("create database " + extraPrimaryDb + " WITH DBPROPERTIES ( '" +
SOURCE_OF_REPLICATION + "' = '1,2,3')");
WarehouseInstance.Tuple inc2Tuple = primary.run("use " + extraPrimaryDb)
.run("create table tbl (fld int)")
.run("use " + primaryDbName)
.dump(primaryDbName, dumpWithClause);
Assert.assertEquals(primary.getCurrentNotificationEventId().getEventId(),
Long.valueOf(inc2Tuple.lastReplicationId).longValue());
// Incremental load to existing database with empty dump directory should set the repl id to the last event at src.
replica.load(replicatedDbName, primaryDbName, loadWithClause)
.status(replicatedDbName)
.verifyResult(inc2Tuple.lastReplicationId);
}
@Test
public void testExtTableBootstrapDuringIncrementalWithoutAnyEvents() throws Throwable {
List<String> loadWithClause = ReplicationTestUtils.includeExternalTableClause(false);
List<String> dumpWithClause = Collections.singletonList(
"'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'"
);
WarehouseInstance.Tuple bootstrapDump = primary
.run("use " + primaryDbName)
.run("create external table t1 (id int)")
.run("insert into table t1 values (1)")
.run("create table t2 (id int)")
.run("insert into table t2 values (1)")
.dump(primaryDbName, dumpWithClause);
replica.load(replicatedDbName, primaryDbName, loadWithClause)
.status(replicatedDbName)
.verifyResult(bootstrapDump.lastReplicationId)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyFailure(new String[] {"t1" })
.run("show tables like 't2'")
.verifyResult("t2")
.verifyReplTargetProperty(replicatedDbName);
// This looks like an empty dump but it has the ALTER TABLE event created by the previous
// dump. We need it here so that the next dump won't have any events.
WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName, ReplicationTestUtils.includeExternalTableClause(true));
replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(incTuple.lastReplicationId);
// Take a dump with external tables bootstrapped and load it
dumpWithClause = ReplicationTestUtils.externalTableWithClause(new ArrayList<>(), true, true);
WarehouseInstance.Tuple inc2Tuple = primary.run("use " + primaryDbName)
.dump(primaryDbName, dumpWithClause);
replica.load(replicatedDbName, primaryDbName, loadWithClause)
.status(replicatedDbName)
.verifyResult(inc2Tuple.lastReplicationId)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
.run("show tables like 't2'")
.verifyResult("t2")
.verifyReplTargetProperty(replicatedDbName);
}
@Test
public void replicationWithTableNameContainsKeywords() throws Throwable {
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create external table t1_functions (id int)")
.run("insert into table t1_functions values (1)")
.run("insert into table t1_functions values (2)")
.run("create external table t2_constraints (place string) partitioned by (country string)")
.run("insert into table t2_constraints partition(country='india') values ('bangalore')")
.run("insert into table t2_constraints partition(country='us') values ('austin')")
.run("insert into table t2_constraints partition(country='france') values ('paris')")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("use " + replicatedDbName)
.run("show tables like 't1_functions'")
.verifyResults(new String[] {"t1_functions"})
.run("show tables like 't2_constraints'")
.verifyResults(new String[] {"t2_constraints"})
.run("select id from t1_functions")
.verifyResults(new String[] {"1", "2"})
.verifyReplTargetProperty(replicatedDbName);
primary.run("use " + primaryDbName)
.run("create external table t3_bootstrap (id int)")
.run("insert into table t3_bootstrap values (10)")
.run("insert into table t3_bootstrap values (20)")
.run("create table t4_tables (id int)")
.run("insert into table t4_tables values (10)")
.run("insert into table t4_tables values (20)")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables like 't3_bootstrap'")
.verifyResults(new String[] {"t3_bootstrap"})
.run("show tables like 't4_tables'")
.verifyResults(new String[] {"t4_tables"})
.verifyReplTargetProperty(replicatedDbName);
}
@Test
public void testExternalTableBaseDirMandatory() throws Throwable {
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='/extTablebase'");
WarehouseInstance.Tuple tuple = null;
try {
primary.run("use " + primaryDbName)
.run("create external table t1 (id int)")
.run("insert into table t1 values(1)")
.dump(primaryDbName, withClause);
} catch (SemanticException ex) {
assertTrue(ex.getMessage().contains(
"Fully qualified path for 'hive.repl.replica.external.table.base.dir' is required"));
}
withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname
+ "'='"+ fullyQualifiedReplicaExternalBase +"'");
try {
primary.run("use " + primaryDbName)
.dump(primaryDbName, withClause);
} catch (Exception e) {
Assert.assertEquals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(),
ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode());
}
//delete non recoverable marker
Path dumpPath = new Path(primary.hiveConf.get(HiveConf.ConfVars.REPLDIR.varname),
Base64.getEncoder().encodeToString(primaryDbName.toLowerCase()
.getBytes(StandardCharsets.UTF_8.name())));
FileSystem fs = dumpPath.getFileSystem(conf);
Path nonRecoverableMarker = new Path(fs.listStatus(dumpPath)[0].getPath(), ReplAck.NON_RECOVERABLE_MARKER
.toString());
fs.delete(nonRecoverableMarker, true);
tuple = primary.run("use " + primaryDbName)
.dump(primaryDbName, withClause);
withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'=''");
try {
replica.load(replicatedDbName, primaryDbName, withClause);
} catch (SemanticException ex) {
assertTrue(ex.getMessage().contains(
"Fully qualified path for 'hive.repl.replica.external.table.base.dir' is required"));
}
withClause = ReplicationTestUtils.includeExternalTableClause(true);
withClause.add("'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname
+ "'='"+ fullyQualifiedReplicaExternalBase +"'");
try {
replica.load(replicatedDbName, primaryDbName, withClause);
} catch (Exception e) {
Assert.assertEquals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(),
ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode());
}
//delete non recoverable marker
nonRecoverableMarker = new Path(tuple.dumpLocation, ReplAck.NON_RECOVERABLE_MARKER
.toString());
fs.delete(nonRecoverableMarker, true);
replica.load(replicatedDbName, primaryDbName, withClause);
replica.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResults(new String[] {"t1"})
.run("select id from t1")
.verifyResults(new String[] {"1"})
.verifyReplTargetProperty(replicatedDbName);
}
@Test
public void differentCatalogIncrementalReplication() throws Throwable {
//Create the catalog
Catalog catalog = new Catalog();
catalog.setName("spark");
Warehouse wh = new Warehouse(conf);
catalog.setLocationUri(wh.getWhRootExternal().toString() + File.separator + catalog);
catalog.setDescription("Non-hive catalog");
Hive.get(primary.hiveConf).getMSC().createCatalog(catalog);
//Create database and table in spark catalog
String sparkDbName = "src_spark";
Database sparkdb = new Database();
sparkdb.setCatalogName("spark");
sparkdb.setName(sparkDbName);
Hive.get(primary.hiveConf).getMSC().createDatabase(sparkdb);
SerDeInfo serdeInfo = new SerDeInfo("LBCSerDe", LazyBinaryColumnarSerDe.class.getCanonicalName(),
new HashMap<String, String>());
ArrayList<FieldSchema> cols = new ArrayList<FieldSchema>(1);
cols.add(new FieldSchema("place", serdeConstants.STRING_TYPE_NAME, ""));
StorageDescriptor sd
= new StorageDescriptor(cols, null,
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat",
false, 0, serdeInfo, null, null, null);
Map<String, String> tableParameters = new HashMap<String, String>();
Table sparkTable = new Table("mgt1", sparkDbName, "", 0, 0, 0,
sd, null, tableParameters, "", "", "");
sparkTable.setCatName("spark");
Hive.get(primary.hiveConf).getMSC().createTable(sparkTable);
//create same db in hive catalog
Map<String, String> params = new HashMap<>();
params.put(SOURCE_OF_REPLICATION, "1");
Database hiveDb = new Database();
hiveDb.setCatalogName("hive");
hiveDb.setName(sparkDbName);
hiveDb.setParameters(params);
Hive.get(primary.hiveConf).getMSC().createDatabase(hiveDb);
primary.dump(sparkDbName);
//spark tables are not replicated in bootstrap
replica.load(replicatedDbName, sparkDbName)
.run("use " + replicatedDbName)
.run("show tables like mgdt1")
.verifyResult(null);
Path externalTableLocation =
new Path("/" + testName.getMethodName() + "/t1/");
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.mkdirs(externalTableLocation, new FsPermission("777"));
//Create another table in spark
sparkTable = new Table("mgt2", sparkDbName, "", 0, 0, 0,
sd, null, tableParameters, "", "", "");
sparkTable.setCatName("spark");
Hive.get(primary.hiveConf).getMSC().createTable(sparkTable);
//Incremental load shouldn't copy any events from spark catalog
primary.dump(sparkDbName);
replica.load(replicatedDbName, sparkDbName)
.run("use " + replicatedDbName)
.run("show tables like mgdt1")
.verifyResult(null)
.run("show tables like 'mgt2'")
.verifyResult(null);
primary.run("drop database if exists " + sparkDbName + " cascade");
}
@Test
public void testDatabaseLevelCopyLazy() throws Throwable {
testDatabaseLevelCopy(true);
}
@Test
public void testDatabaseLevelCopyAtSource() throws Throwable {
testDatabaseLevelCopy(false);
}
public void testDatabaseLevelCopy(boolean runCopyTasksOnTarget)
throws Throwable {
Path externalTableLocation =
new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + "a/");
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.mkdirs(externalTableLocation, new FsPermission("777"));
Path externalTablePartitionLocation =
new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + "part/");
fs.mkdirs(externalTableLocation, new FsPermission("777"));
List<String> withClause = Arrays.asList("'distcp.options.update'=''",
"'" + REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK.varname + "'='true'",
"'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname
+ "'='" + runCopyTasksOnTarget + "'");
// Create a table within the warehouse location, one outside and one with
// a partition outside the default location.
WarehouseInstance.Tuple tuple =
primary.run("use " + primaryDbName)
.run("create external table a (i int, j int) "
+ "row format delimited fields terminated by ',' "
+ "location '" + externalTableLocation.toUri() + "'")
.run("insert into a values(1,2)")
.run("create external table b (id int)")
.run("insert into b values(5)")
.run("create external table c (place string) partitioned by (country "
+ "string)")
.run("insert into table c partition(country='india') values "
+ "('bangalore')")
.run("ALTER TABLE c ADD PARTITION (country='france') LOCATION '"
+ externalTablePartitionLocation.toString() + "'")
.run("insert into c partition(country='france') values('paris')")
.dump(primaryDbName, withClause);
Database primaryDb = primary.getDatabase(primaryDbName);
// Confirm the a table is outside the db location.
Table aTable = primary.getTable(primaryDbName, "a");
assertFalse(FileUtils
.isPathWithinSubtree(new Path(aTable.getSd().getLocation()),
new Path(primaryDb.getLocationUri())));
//Confirm the b table is inside the db location.
Table bTable = primary.getTable(primaryDbName, "b");
assertTrue(FileUtils
.isPathWithinSubtree(new Path(bTable.getSd().getLocation()),
new Path(primaryDb.getLocationUri())));
//Confirm the c table is inside the db location.
Table cTable = primary.getTable(primaryDbName, "c");
assertTrue(FileUtils
.isPathWithinSubtree(new Path(cTable.getSd().getLocation()),
new Path(primaryDb.getLocationUri())));
// Confirm the partition of c table is outside db location.
String partitionBtableLocation =
primary.getAllPartitions(primaryDbName, "c").get(0).getSd()
.getLocation();
assertFalse(FileUtils.isPathWithinSubtree(new Path(partitionBtableLocation),
new Path(primaryDb.getLocationUri())));
// Do a load and verify all the data is there.
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("select i from a where j=2")
.verifyResult("1")
.run("select id from b")
.verifyResult("5")
.run("select place from c where country='india'")
.verifyResult("bangalore")
.run("select place from c where country='france'")
.verifyResult("paris");
// Check the task copied post bootstrap, It should have the database loc,
// the table 'a' since that is outside of the default location, and the
// 'c', since its partition is out of the default location.
ReplicationTestUtils
.assertExternalFileList(Arrays.asList("dbPath:" + new Path(primaryDb.getLocationUri()).getName(), "a", "c"),
tuple.dumpLocation, primary);
// Add more data to tables and do a incremental run and create another
// tables one inside and other outside default location.
externalTableLocation = new Path(
"/" + testName.getMethodName() + "/" + primaryDbName + "/" + "newout/");
fs.mkdirs(externalTableLocation, new FsPermission("777"));
tuple =
primary.run("use " + primaryDbName)
.run("insert into a values(3,4)")
.run("insert into b values(6)")
.run("insert into table c partition(country='india') values "
+ "('delhi')")
.run("insert into c partition(country='france') values('lyon')")
.run("create external table newin (id int)")
.run("insert into newin values(1)")
.run("create external table newout(id int) row format delimited "
+ "fields terminated by ',' location '" + externalTableLocation
.toUri() + "'")
.run("insert into newout values(2)")
.dump(primaryDbName, withClause);
// Check whether table newin is inside the database location.
Table tableNewin = primary.getTable(primaryDbName,"newin");
assertTrue(FileUtils
.isPathWithinSubtree(new Path(tableNewin.getSd().getLocation()),
new Path(primaryDb.getLocationUri())));
// Check whether table newout is outside database location,
Table tableNewout = primary.getTable(primaryDbName,"newout");
assertFalse(FileUtils
.isPathWithinSubtree(new Path(tableNewout.getSd().getLocation()),
new Path(primaryDb.getLocationUri())));
// Do an incremental load and check if all the old and new data is there.
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("select i from a where j=4")
.verifyResult("3")
.run("select id from b")
.verifyResults(new String[]{"5", "6"})
.run("select place from c where country='india'")
.verifyResults(new String[]{"bangalore", "delhi"})
.run("select place from c where country='france'")
.verifyResults(new String[]{"paris", "lyon"})
.run("select id from newin")
.verifyResult("1")
.run("select id from newout")
.verifyResult("2");
// New table in the warehouse shouldn't be there but the table created
// outside should be there, apart from the ones in the previous run.
ReplicationTestUtils.assertExternalFileList(
Arrays.asList("dbPath:" + new Path(primaryDb.getLocationUri()).getName(), "a", "c", "newout"),
tuple.dumpLocation, primary);
}
@Test
public void testDatabaseLevelCopyDisabled() throws Throwable {
Path externalTableLocation =
new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + "a/");
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.mkdirs(externalTableLocation, new FsPermission("777"));
Path externalTablePartitionLocation =
new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + "part/");
fs.mkdirs(externalTableLocation, new FsPermission("777"));
List<String> withClause = Arrays.asList("'distcp.options.update'=''",
"'" + REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK.varname + "'='false'");
// Create a table within the warehouse location, one outside and one with
// a partition outside the default location.
WarehouseInstance.Tuple tuple =
primary.run("use " + primaryDbName)
.run("create external table a (i int, j int) "
+ "row format delimited fields terminated by ',' "
+ "location '" + externalTableLocation.toUri() + "'")
.run("insert into a values(1,2)")
.run("create external table b (id int)")
.run("insert into b values(5)")
.run("create external table c (place string) partitioned by (country "
+ "string)")
.run("insert into table c partition(country='india') values "
+ "('bangalore')")
.run("ALTER TABLE c ADD PARTITION (country='france') LOCATION '"
+ externalTablePartitionLocation.toString() + "'")
.run("insert into c partition(country='france') values('paris')")
.dump(primaryDbName, withClause);
// Do a load and verify all the data is there.
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("select i from a where j=2")
.verifyResult("1")
.run("select id from b")
.verifyResult("5")
.run("select place from c where country='india'")
.verifyResult("bangalore")
.run("select place from c where country='france'")
.verifyResult("paris");
ReplicationTestUtils.assertExternalFileList(Arrays.asList("a", "b", "c"), tuple.dumpLocation, primary);
// Add more data to tables and do a incremental run and create another
// tables one inside and other outside default location.
externalTableLocation = new Path(
"/" + testName.getMethodName() + "/" + primaryDbName + "/" + "newout/");
fs.mkdirs(externalTableLocation, new FsPermission("777"));
tuple =
primary.run("use " + primaryDbName)
.run("insert into a values(3,4)")
.run("insert into b values(6)")
.run("insert into table c partition(country='india') values "
+ "('delhi')")
.run("insert into c partition(country='france') values('lyon')")
.run("create external table newin (id int)")
.run("insert into newin values(1)")
.run("create external table newout(id int) row format delimited "
+ "fields terminated by ',' location '" + externalTableLocation
.toUri() + "'")
.run("insert into newout values(2)")
.dump(primaryDbName, withClause);
// Do an incremental load and check if all the old and new data is there.
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("select i from a where j=4")
.verifyResult("3")
.run("select id from b")
.verifyResults(new String[]{"5", "6"})
.run("select place from c where country='india'")
.verifyResults(new String[]{"bangalore", "delhi"})
.run("select place from c where country='france'")
.verifyResults(new String[]{"paris", "lyon"})
.run("select id from newin")
.verifyResult("1")
.run("select id from newout")
.verifyResult("2");
ReplicationTestUtils.assertExternalFileList(Arrays
.asList("a", "b", "c", "newin", "newout"), tuple.dumpLocation, primary);
}
@Test
public void testDataCopyEndLogAtSource() throws Throwable {
testDataCopyEndLog(false);
}
@Test
public void testDataCopyEndLogAtTarget() throws Throwable {
testDataCopyEndLog(true);
}
public void testDataCopyEndLog(boolean runCopyTasksOnTarget) throws Throwable {
// Get the logger at the root level.
Logger logger = LogManager.getLogger("hive.ql.metadata.Hive");
Level oldLevel = logger.getLevel();
LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
Configuration config = ctx.getConfiguration();
LoggerConfig loggerConfig = config.getLoggerConfig(logger.getName());
loggerConfig.setLevel(Level.DEBUG);
ctx.updateLoggers();
// Create a String Appender to capture log output
StringAppender appender = StringAppender.createStringAppender("%m");
appender.addToLogger(logger.getName(), Level.DEBUG);
appender.start();
appender.reset();
List<String> withClause = Arrays.asList("'distcp.options.update'=''",
"'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + runCopyTasksOnTarget + "'");
// Perform bootstrap dump & load.
primary.run("use " + primaryDbName)
.run("create external table a (i int)")
.run("insert into a values (1),(2),(3),(4)")
.run("create external table b (i int)")
.run("insert into b values (5),(6),(7),(8)")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables like 'a'")
.verifyResults(Collections.singletonList("a"))
.run("show tables like 'b'")
.verifyResults(Collections.singletonList("b"));
String logStr = appender.getOutput();
// Check the log contains DATA_COPY_END after Distcp
assertTrue(logStr.lastIndexOf("REPL::DATA_COPY_END:") > logStr.lastIndexOf("Completed DirCopyTask for source"));
appender.reset();
// Perform incremental dump & load.
primary.run("use " + primaryDbName)
.run("create table c (i int)")
.run("insert into c values (10),(11)")
.run("insert into a values (5),(6)")
.run("insert into b values (9),(10)").dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("select i From c")
.verifyResults(new String[] {"10", "11"});
logStr = appender.getOutput();
// Check the log contains DATA_COPY_END after Distcp
assertTrue(logStr.indexOf("REPL::DATA_COPY_END:") > logStr.lastIndexOf("Completed DirCopyTask for source:"));
loggerConfig.setLevel(oldLevel);
ctx.updateLoggers();
appender.removeFromLogger(logger.getName());
}
@Test
public void testSingleCopyTasksAtSource() throws Throwable {
testDataCopyEndLog(false);
}
@Test
public void testSingleCopyTasksAtTarget() throws Throwable {
testDataCopyEndLog(true);
}
public void testSingleCopyTasks(boolean runCopyTasksOnTarget)
throws Throwable {
// Create five tables, 2 inside each parent path and one inside the database location.
Path parentPath1 = new Path("/" + testName.getMethodName() + "/" + "external1");
Path parentPath2 = new Path("/" + testName.getMethodName() + "/" + "external1");
Path parent1Table1 = new Path(parentPath1,"table1");
Path parent1Table2 = new Path(parentPath1,"table2");
Path parent2Table3 = new Path(parentPath2,"table3");
Path parent2Table4 = new Path(parentPath2,"table4");
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.mkdirs(parent1Table1, new FsPermission("777"));
fs.mkdirs(parent1Table2, new FsPermission("777"));
fs.mkdirs(parent2Table3, new FsPermission("777"));
fs.mkdirs(parent2Table4, new FsPermission("777"));
List<String> withClause = Arrays
.asList("'distcp.options.update'=''", "'" + REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK.varname + "'='true'",
"'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + runCopyTasksOnTarget + "'",
"'" + REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK_PATHS.varname + "'='" + fs.makeQualified(parentPath1) + ","
+ fs.makeQualified(parentPath2) + "'");
WarehouseInstance.Tuple tuple =
primary.run("use " + primaryDbName)
.run("create external table table1 (id int) row format delimited fields terminated by ',' location '"
+ parent1Table1.toUri() + "'")
.run("create external table table2 (id int) row format delimited fields terminated by ',' location '"
+ parent1Table2.toUri() + "'")
.run("create external table table3 (id int) row format delimited fields terminated by ',' location '"
+ parent2Table3.toUri() + "'")
.run("create external table table4 (id int) row format delimited fields terminated by ',' location '"
+ parent2Table4.toUri() + "'")
.run("create external table table5(id int) row format delimited fields terminated by ','")
.run("insert into table1 values (1)")
.run("insert into table2 values (2)")
.run("insert into table3 values (3)")
.run("insert into table4 values (4)")
.run("insert into table4 values (5)")
.dump(primaryDbName, withClause);
Path primaryDb = new Path(primary.getDatabase(primaryDbName).getLocationUri());
// Do a load and verify all the data is there.
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("select id from table1 ")
.verifyResult("1")
.run("select id from table2")
.verifyResult("2")
.run("select id from table3")
.verifyResult("3")
.run("select id from table4")
.verifyResult("4")
.run("select id from table5")
.verifyResult("5");
// Verify the tasks created for these tables, it should be one for the database, two for the two parent paths
// that we configured.
ReplicationTestUtils.assertExternalFileList(
Arrays.asList(primaryDb.getName(), parentPath1.getName(), parentPath2.getName()), tuple.dumpLocation, primary);
// Add more data to tables and do a incremental run and check if things stays same.
tuple =
primary.run("use " + primaryDbName)
.run("insert into table1 values (4)")
.run("insert into table2 values (3)")
.run("insert into table3 values (2)")
.run("insert into table4 values (1)")
.run("insert into table5 values (6)")
.dump(primaryDbName, withClause);
// Do an incremental load and check if all the old and new data is there.
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("select id from table1 ")
.verifyResults(new String[] {"1", "4"})
.run("select id from table2 ")
.verifyResults(new String[] {"2", "3"})
.run("select id from table3 ")
.verifyResults(new String[] {"3", "2"})
.run("select id from table4")
.verifyResults(new String[] {"4", "1"})
.run("select id from table5")
.verifyResults(new String[] {"5", "6"});;
// The same tasks should be created.
ReplicationTestUtils.assertExternalFileList(
Arrays.asList(primaryDb.getName(), parentPath1.getName(), parentPath2.getName()), tuple.dumpLocation, primary);
}
@Test
public void testSingleCopyTasksConfigurationAtSource() throws Throwable {
testSingleCopyTasksConfiguration(false);
}
@Test
public void testSingleCopyTasksConfigurationAtTarget() throws Throwable {
testSingleCopyTasksConfiguration(true);
}
public void testSingleCopyTasksConfiguration(boolean runCopyTasksOnTarget)
throws Throwable {
// Create five tables, 1 inside the custom parent path, 1 inside the database location and one separate.
Path parentPath1 = new Path("/" + testName.getMethodName() + "/" + "external1");
Path parent1Table1 = new Path(parentPath1,"table1");
Path externalTablePath = new Path("/" + testName.getMethodName() + "/" + "externaltable2");
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.mkdirs(parent1Table1, new FsPermission("777"));
fs.mkdirs(externalTablePath, new FsPermission("777"));
try (FSDataOutputStream outputStream =
fs.create(new Path(externalTablePath, "file1.txt"))) {
outputStream.write("3\n".getBytes());
outputStream.write("4\n".getBytes());
}
try (FSDataOutputStream outputStream =
fs.create(new Path(parent1Table1, "file1.txt"))) {
outputStream.write("3\n".getBytes());
outputStream.write("4\n".getBytes());
}
// Create a filter file for DistCp
String filterFilePath = "/tmp/filter";
FileWriter myWriter = new FileWriter(filterFilePath);
myWriter.write(".*file1.txt.*");
myWriter.close();
List<String> withClause = Arrays
.asList("'distcp.options.update'=''", "'" + REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK.varname + "'='true'",
"'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + runCopyTasksOnTarget + "'",
"'" + REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK_PATHS.varname + "'='" + fs.makeQualified(parentPath1) +
"'", "'hive.dbpath.distcp.options.filters'='" + filterFilePath + "'");
WarehouseInstance.Tuple tuple =
primary.run("use " + primaryDbName)
.run("create external table table1 (id int) row format delimited fields terminated by ',' location '"
+ parent1Table1.toUri() + "'")
.run("create external table table2 (id int) row format delimited fields terminated by ',' location '"
+ externalTablePath.toUri() + "'")
.run("create external table table3 (id int) row format delimited fields terminated by ','")
.run("insert into table1 values (1)")
.run("insert into table3 values (3)")
.dump(primaryDbName, withClause);
Path primaryDb = new Path(primary.getDatabase(primaryDbName).getLocationUri());
// Do a load and verify all the data is there.
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("select id from table1 ")
.verifyResult("1")
.run("select id from table2")
.verifyResults(new String[] {"3", "4"})
.run("select id from table3")
.verifyResult("3");
// Verify the tasks created for these tables, it should be one for the database, two for the two parent paths
// that we configured.
ReplicationTestUtils.assertExternalFileList(
Arrays.asList("dbPath:" + primaryDb.getName(), "dbPath:" + parentPath1.getName(), "table2"), tuple.dumpLocation,
primary);
// Verify the preserve config only worked for db level path.
Path parent1Table1_target = new Path(REPLICA_EXTERNAL_BASE, parent1Table1.toUri().getPath().replaceFirst("/", ""));
assertFalse(fs.exists(new Path(parent1Table1_target, "file1.txt")));
// Verify the filter config gets used for the only db level paths.
Path externalTablePath_target =
new Path(REPLICA_EXTERNAL_BASE, externalTablePath.toUri().getPath().replaceFirst("/", ""));
assertTrue(fs.exists(new Path(externalTablePath_target, "file1.txt")));
// Add more data to tables and do a incremental run and check if things stays same.
tuple =
primary.run("use " + primaryDbName)
.run("insert into table1 values (4)")
.run("insert into table2 values (5)")
.run("insert into table3 values (2)")
.dump(primaryDbName, withClause);
// Do an incremental load and check if all the old and new data is there.
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("select id from table1 ")
.verifyResults(new String[] {"1", "4"})
.run("select id from table2 ")
.verifyResults(new String[] {"3", "4", "5"})
.run("select id from table3 ")
.verifyResults(new String[] {"3", "2"});
// The same tasks should be created.
ReplicationTestUtils.assertExternalFileList(
Arrays.asList("dbPath:" + primaryDb.getName(), "dbPath:" + parentPath1.getName(), "table2"), tuple.dumpLocation,
primary);
// Verify the filter config gets used for the only db level paths.
assertFalse(fs.exists(new Path(parent1Table1_target, "file1.txt")));
assertTrue(fs.exists(new Path(externalTablePath_target, "file1.txt")));
// Clean up the filter file.
new File(filterFilePath).delete();
}
@Test
public void testTableAndPartitionExportServiceWithParallelism() throws Throwable {
List<String> extTableList = new ArrayList<String>();
List<String> mgnTableList = new ArrayList<String>();
List<String> dumpWithClause = ReplicationTestUtils.includeExternalTableClause(true);
primary.run("use " + primaryDbName);
// create 5 managed partitioned and 5 external un-partitioned tables.
// create first 2 tables with 2 partitions, another 2 with 4 partitions and
// another 2 tables with 6 partitions and so on.
int pt = 0;
for (int i = 0; i < 5; i++) {
primary.run("CREATE EXTERNAL TABLE ptned" + i + " (a int)");
extTableList.add("ptned" + i);
primary.run("CREATE TABLE ptnmgned" + i + " (a int) partitioned by (b int)");
mgnTableList.add("ptnmgned" + i);
if (i % 2 == 0) {
pt += 2;
}
for (int j = 0; j < pt; j++) {
primary.run("ALTER TABLE ptnmgned" + i + " ADD PARTITION(b=" + j + ")");
// insert some rows in each partitions of table
for (int k = 0; k < pt; k++) {
primary.run("INSERT INTO TABLE ptned" + i + " VALUES (" + k + ")");
primary.run("INSERT INTO TABLE ptnmgned" + i + " PARTITION(b=" + j + ") VALUES (" + k + ")");
}
}
}
// create 5 un-partitioned table
for (int i = 0; i < 5; i++) {
primary.run("CREATE EXTERNAL TABLE unptned" + i + " (a int)");
extTableList.add("unptned" + i);
primary.run("CREATE TABLE unptnmgned" + i + " (a int)");
mgnTableList.add("unptnmgned" + i);
//insert some rows in each tables
for (int j = 0; j < 2; j++) {
primary.run("INSERT INTO TABLE unptned" + i + " VALUES (" + j + ")");
primary.run("INSERT INTO TABLE unptnmgned" + i + " VALUES (" + j + ")");
}
}
//start bootstrap dump
WarehouseInstance.Tuple tuple = primary.dump(primaryDbName, dumpWithClause);
// verify that the external table filelist is written correctly for bootstrap
ReplicationTestUtils.assertExternalFileList(extTableList, tuple.dumpLocation, primary);
List<String> newTableList = new ArrayList<String>();
newTableList.addAll(extTableList);
newTableList.addAll(mgnTableList);
replica.load(replicatedDbName, primaryDbName, dumpWithClause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(newTableList);
primary.run("use " + primaryDbName);
// create 5 un-partitioned table for incremental dump
for (int i = 0; i < 5; i++) {
primary.run("CREATE EXTERNAL TABLE incrunptned" + i + "(a int)");
extTableList.add("incrunptned" + i);
primary.run("CREATE TABLE incrunptnmgned" + i + "(a int)");
mgnTableList.add("incrunptnmgned" + i);
// insert some rows in each tables
for (int j = 0; j < 2; j++) {
primary.run("INSERT INTO TABLE incrunptned" + i + " VALUES (" + j + ")");
primary.run("INSERT INTO TABLE incrunptnmgned" + i + " VALUES (" + j + ")");
}
}
newTableList.clear();
newTableList.addAll(extTableList);
newTableList.addAll(mgnTableList);
//start incremental dump
WarehouseInstance.Tuple newTuple = primary.dump(primaryDbName, dumpWithClause);
replica.load(replicatedDbName, primaryDbName, dumpWithClause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(newTableList);
// verify that the external table filelist is written correctly for incremental dump
ReplicationTestUtils.assertExternalFileList(extTableList, newTuple.dumpLocation, primary);
}
}