blob: 0cc72821f000cab396f02e99ee49d13fed7be1f1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.parse;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.apache.hadoop.hive.ql.util.DependencyResolver;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.annotation.Nullable;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.NON_RECOVERABLE_MARKER;
import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.TARGET_OF_REPLICATION;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcrossInstances {
private static final String NS_REMOTE = "nsRemote";
@BeforeClass
public static void classLevelSetup() throws Exception {
HashMap<String, String> overrides = new HashMap<>();
overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
GzipJSONMessageEncoder.class.getCanonicalName());
overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
UserGroupInformation.getCurrentUser().getUserName());
internalBeforeClassSetup(overrides, TestReplicationScenariosAcrossInstances.class);
}
@Test
public void testCreateFunctionIncrementalReplication() throws Throwable {
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(bootStrapDump.lastReplicationId);
primary.run("CREATE FUNCTION " + primaryDbName
+ ".testFunctionOne as 'hivemall.tools.string.StopwordUDF' "
+ "using jar 'ivy://io.github.myui:hivemall:0.4.0-2'")
.run("CREATE FUNCTION " + primaryDbName
+ ".testFunctionTwo as 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax'");
WarehouseInstance.Tuple incrementalDump =
primary.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(incrementalDump.lastReplicationId)
.run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
.verifyResults(new String[] { replicatedDbName + ".testFunctionOne",
replicatedDbName + ".testFunctionTwo" });
// Test the idempotent behavior of CREATE FUNCTION
replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(incrementalDump.lastReplicationId)
.run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
.verifyResults(new String[] { replicatedDbName + ".testFunctionOne",
replicatedDbName + ".testFunctionTwo" });
}
@Test
public void testCreateFunctionOnHDFSIncrementalReplication() throws Throwable {
Path identityUdfLocalPath = new Path("../../data/files/identity_udf.jar");
Path identityUdf1HdfsPath = new Path(primary.functionsRoot, "idFunc1" + File.separator + "identity_udf1.jar");
Path identityUdf2HdfsPath = new Path(primary.functionsRoot, "idFunc2" + File.separator + "identity_udf2.jar");
setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf1HdfsPath);
setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf2HdfsPath);
primary.run("CREATE FUNCTION " + primaryDbName
+ ".idFunc1 as 'IdentityStringUDF' "
+ "using jar '" + identityUdf1HdfsPath.toString() + "'");
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(bootStrapDump.lastReplicationId)
.run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
.verifyResults(new String[] { replicatedDbName + ".idFunc1"})
.run("SELECT " + replicatedDbName + ".idFunc1('MyName')")
.verifyResults(new String[] { "MyName"});
primary.run("CREATE FUNCTION " + primaryDbName
+ ".idFunc2 as 'IdentityStringUDF' "
+ "using jar '" + identityUdf2HdfsPath.toString() + "'");
WarehouseInstance.Tuple incrementalDump =
primary.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(incrementalDump.lastReplicationId)
.run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
.verifyResults(new String[] { replicatedDbName + ".idFunc1",
replicatedDbName + ".idFunc2" })
.run("SELECT " + replicatedDbName + ".idFunc2('YourName')")
.verifyResults(new String[] { "YourName"});
}
@Test
public void testCreateFunctionOnHDFSIncrementalReplicationLazyCopy() throws Throwable {
Path identityUdfLocalPath = new Path("../../data/files/identity_udf.jar");
Path identityUdf1HdfsPath = new Path(primary.functionsRoot, "idFunc1" + File.separator + "identity_udf1.jar");
Path identityUdf2HdfsPath = new Path(primary.functionsRoot, "idFunc2" + File.separator + "identity_udf2.jar");
setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf1HdfsPath);
setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf2HdfsPath);
List<String> withClasuse = Arrays.asList("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='true'");
primary.run("CREATE FUNCTION " + primaryDbName
+ ".idFunc1 as 'IdentityStringUDF' "
+ "using jar '" + identityUdf1HdfsPath.toString() + "'");
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, withClasuse);
replica.load(replicatedDbName, primaryDbName, withClasuse)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(bootStrapDump.lastReplicationId)
.run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
.verifyResults(new String[] { replicatedDbName + ".idFunc1"})
.run("SELECT " + replicatedDbName + ".idFunc1('MyName')")
.verifyResults(new String[] { "MyName"});
primary.run("CREATE FUNCTION " + primaryDbName
+ ".idFunc2 as 'IdentityStringUDF' "
+ "using jar '" + identityUdf2HdfsPath.toString() + "'");
WarehouseInstance.Tuple incrementalDump =
primary.dump(primaryDbName, withClasuse);
replica.load(replicatedDbName, primaryDbName, withClasuse)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(incrementalDump.lastReplicationId)
.run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
.verifyResults(new String[] { replicatedDbName + ".idFunc1",
replicatedDbName + ".idFunc2" })
.run("SELECT " + replicatedDbName + ".idFunc2('YourName')")
.verifyResults(new String[] { "YourName"});
}
@Test
public void testBootstrapReplLoadRetryAfterFailureForFunctions() throws Throwable {
String funcName1 = "f1";
String funcName2 = "f2";
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("CREATE FUNCTION " + primaryDbName + "." + funcName1 +
" as 'hivemall.tools.string.StopwordUDF' " +
"using jar 'ivy://io.github.myui:hivemall:0.4.0-2'")
.run("CREATE FUNCTION " + primaryDbName + "." + funcName2 +
" as 'hivemall.tools.string.SplitWordsUDF' "+
"using jar 'ivy://io.github.myui:hivemall:0.4.0-1'")
.dump(primaryDbName);
// Allow create function only on f1. Create should fail for the second function.
BehaviourInjection<CallerArguments, Boolean> callerVerifier
= new BehaviourInjection<CallerArguments, Boolean>() {
@Override
public Boolean apply(CallerArguments args) {
injectionPathCalled = true;
if (!args.dbName.equalsIgnoreCase(replicatedDbName)) {
LOG.warn("Verifier - DB: " + String.valueOf(args.dbName));
return false;
}
if (args.funcName != null) {
LOG.debug("Verifier - Function: " + String.valueOf(args.funcName));
return args.funcName.equals(funcName1);
}
return true;
}
};
InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
// Trigger bootstrap dump which just creates function f1 but not f2
List<String> withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'",
"'hive.in.repl.test.files.sorted'='true'");
try {
replica.loadFailure(replicatedDbName, primaryDbName, withConfigs);
callerVerifier.assertInjectionsPerformed(true, false);
} finally {
InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
}
// Verify that only f1 got loaded
replica.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.verifyResult("null")
.run("show functions like '" + replicatedDbName + "%'")
.verifyResult(replicatedDbName + "." + funcName1);
// Verify no calls to load f1 only f2.
callerVerifier = new BehaviourInjection<CallerArguments, Boolean>() {
@Override
public Boolean apply(CallerArguments args) {
injectionPathCalled = true;
if (!args.dbName.equalsIgnoreCase(replicatedDbName)) {
LOG.warn("Verifier - DB: " + String.valueOf(args.dbName));
return false;
}
if (args.funcName != null) {
LOG.debug("Verifier - Function: " + String.valueOf(args.funcName));
return args.funcName.equals(funcName2);
}
return true;
}
};
InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
try {
// Retry with same dump with which it was already loaded should resume the bootstrap load.
// This time, it completes by adding just the function f2
replica.load(replicatedDbName, primaryDbName);
callerVerifier.assertInjectionsPerformed(true, false);
} finally {
InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
}
// Verify that both the functions are available.
replica.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("show functions like '" + replicatedDbName +"%'")
.verifyResults(new String[] {replicatedDbName + "." + funcName1,
replicatedDbName +"." +funcName2});
}
@Test
public void testDropFunctionIncrementalReplication() throws Throwable {
primary.run("CREATE FUNCTION " + primaryDbName
+ ".testFunctionAnother as 'hivemall.tools.string.StopwordUDF' "
+ "using jar 'ivy://io.github.myui:hivemall:0.4.0-2'");
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(bootStrapDump.lastReplicationId);
primary.run("Drop FUNCTION " + primaryDbName + ".testFunctionAnother ");
WarehouseInstance.Tuple incrementalDump =
primary.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(incrementalDump.lastReplicationId)
.run("SHOW FUNCTIONS LIKE '%testfunctionanother%'")
.verifyResult(null);
// Test the idempotent behavior of DROP FUNCTION
replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(incrementalDump.lastReplicationId)
.run("SHOW FUNCTIONS LIKE '%testfunctionanother%'")
.verifyResult(null);
}
@Test
public void testBootstrapFunctionReplication() throws Throwable {
primary.run("CREATE FUNCTION " + primaryDbName
+ ".testFunction as 'hivemall.tools.string.StopwordUDF' "
+ "using jar 'ivy://io.github.myui:hivemall:0.4.0-2'");
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName)
.run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
.verifyResult(replicatedDbName + ".testFunction");
}
@Test
public void testCreateFunctionWithFunctionBinaryJarsOnHDFS() throws Throwable {
Dependencies dependencies = dependencies("ivy://io.github.myui:hivemall:0.4.0-2", primary);
String jarSubString = dependencies.toJarSubSql();
primary.run("CREATE FUNCTION " + primaryDbName
+ ".anotherFunction as 'hivemall.tools.string.StopwordUDF' "
+ "using " + jarSubString);
WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName)
.run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
.verifyResult(replicatedDbName + ".anotherFunction");
FileStatus[] fileStatuses = replica.miniDFSCluster.getFileSystem().globStatus(
new Path(
replica.functionsRoot + "/" + replicatedDbName.toLowerCase() + "/anotherfunction/*/*")
, path -> path.toString().endsWith("jar"));
List<String> expectedDependenciesNames = dependencies.jarNames();
assertThat(fileStatuses.length, is(equalTo(expectedDependenciesNames.size())));
List<String> jars = Arrays.stream(fileStatuses).map(f -> {
String[] splits = f.getPath().toString().split("/");
return splits[splits.length - 1];
}).collect(Collectors.toList());
assertThat(jars, containsInAnyOrder(expectedDependenciesNames.toArray()));
}
@Test
public void testIncrementalCreateFunctionWithFunctionBinaryJarsOnHDFS() throws Throwable {
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(bootStrapDump.lastReplicationId);
Dependencies dependencies = dependencies("ivy://io.github.myui:hivemall:0.4.0-2", primary);
String jarSubString = dependencies.toJarSubSql();
primary.run("CREATE FUNCTION " + primaryDbName
+ ".anotherFunction as 'hivemall.tools.string.StopwordUDF' "
+ "using " + jarSubString);
WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName)
.run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
.verifyResult(replicatedDbName + ".anotherFunction");
FileStatus[] fileStatuses = replica.miniDFSCluster.getFileSystem().globStatus(
new Path(
replica.functionsRoot + "/" + replicatedDbName.toLowerCase() + "/anotherfunction/*/*")
, path -> path.toString().endsWith("jar"));
List<String> expectedDependenciesNames = dependencies.jarNames();
assertThat(fileStatuses.length, is(equalTo(expectedDependenciesNames.size())));
List<String> jars = Arrays.stream(fileStatuses).map(f -> {
String[] splits = f.getPath().toString().split("/");
return splits[splits.length - 1];
}).collect(Collectors.toList());
assertThat(jars, containsInAnyOrder(expectedDependenciesNames.toArray()));
}
static class Dependencies {
private final List<Path> fullQualifiedJarPaths;
Dependencies(List<Path> fullQualifiedJarPaths) {
this.fullQualifiedJarPaths = fullQualifiedJarPaths;
}
private String toJarSubSql() {
return StringUtils.join(
fullQualifiedJarPaths.stream().map(p -> "jar '" + p + "'").collect(Collectors.toList()),
","
);
}
private List<String> jarNames() {
return fullQualifiedJarPaths.stream().map(p -> {
String[] splits = p.toString().split("/");
return splits[splits.length - 1];
}).collect(Collectors.toList());
}
}
private Dependencies dependencies(String ivyPath, WarehouseInstance onWarehouse)
throws IOException, URISyntaxException, SemanticException {
List<URI> localUris = new DependencyResolver().downloadDependencies(new URI(ivyPath));
List<Path> remotePaths = onWarehouse.copyToHDFS(localUris);
List<Path> collect =
remotePaths.stream().map(r -> {
try {
return PathBuilder
.fullyQualifiedHDFSUri(r, onWarehouse.miniDFSCluster.getFileSystem());
} catch (Exception e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
return new Dependencies(collect);
}
/*
From the hive logs(hive.log) we can also check for the info statement
fgrep "Total Tasks" [location of hive.log]
each line indicates one run of loadTask.
*/
@Test
public void testMultipleStagesOfReplicationLoadTask() throws Throwable {
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create table t1 (id int)")
.run("insert into t1 values (1), (2)")
.run("create table t2 (place string) partitioned by (country string)")
.run("insert into table t2 partition(country='india') values ('bangalore')")
.run("insert into table t2 partition(country='us') values ('austin')")
.run("insert into table t2 partition(country='france') values ('paris')")
.run("create table t3 (rank int)")
.dump(primaryDbName);
// each table creation itself takes more than one task, give we are giving a max of 1, we should hit multiple runs.
List<String> withClause = Collections.singletonList(
"'" + HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS.varname + "'='1'");
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] { "t1", "t2", "t3" })
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("select country from t2 order by country")
.verifyResults(new String[] { "france", "india", "us" });
}
@Test
public void testParallelExecutionOfReplicationBootStrapLoad() throws Throwable {
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create table t1 (id int)")
.run("create table t2 (place string) partitioned by (country string)")
.run("insert into table t2 partition(country='india') values ('bangalore')")
.run("insert into table t2 partition(country='australia') values ('sydney')")
.run("insert into table t2 partition(country='russia') values ('moscow')")
.run("insert into table t2 partition(country='uk') values ('london')")
.run("insert into table t2 partition(country='us') values ('sfo')")
.run("insert into table t2 partition(country='france') values ('paris')")
.run("insert into table t2 partition(country='japan') values ('tokyo')")
.run("insert into table t2 partition(country='china') values ('hkg')")
.run("create table t3 (rank int)")
.dump(primaryDbName);
replica.hiveConf.setBoolVar(HiveConf.ConfVars.EXECPARALLEL, true);
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("show tables")
.verifyResults(new String[] { "t1", "t2", "t3" })
.run("select country from t2")
.verifyResults(Arrays.asList("india", "australia", "russia", "uk", "us", "france", "japan",
"china"));
replica.hiveConf.setBoolVar(HiveConf.ConfVars.EXECPARALLEL, false);
}
@Test
public void testMetadataBootstrapDump() throws Throwable {
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create table acid_table (key int, value int) partitioned by (load_date date) " +
"clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
.run("create table table1 (i int, j int)")
.run("insert into table1 values (1,2)")
.dump(primaryDbName, Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] { "acid_table", "table1" })
.run("select * from table1")
.verifyResults(Collections.emptyList());
}
@Test
public void testIncrementalMetadataReplication() throws Throwable {
//////////// Bootstrap ////////////
WarehouseInstance.Tuple bootstrapTuple = primary
.run("use " + primaryDbName)
.run("create table table1 (i int, j int)")
.run("create table table2 (a int, city string) partitioned by (country string)")
.run("create table table3 (i int, j int)")
.run("insert into table1 values (1,2)")
.dump(primaryDbName, Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] { "table1", "table2", "table3" })
.run("select * from table1")
.verifyResults(Collections.emptyList());
//////////// First Incremental ////////////
WarehouseInstance.Tuple incrementalOneTuple =
primary
.run("use " + primaryDbName)
.run("alter table table1 rename to renamed_table1")
.run("insert into table2 partition(country='india') values (1,'mumbai') ")
.run("create table table4 (i int, j int)")
.dumpWithCommand(
"repl dump " + primaryDbName + " with ('hive.repl.dump.metadata.only'='true')"
);
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] { "renamed_table1", "table2", "table3", "table4" })
.run("select * from renamed_table1")
.verifyResults(Collections.emptyList())
.run("select * from table2")
.verifyResults(Collections.emptyList());
//////////// Second Incremental ////////////
WarehouseInstance.Tuple secondIncremental = primary
.run("alter table table2 add columns (zipcode int)")
.run("alter table table3 change i a string")
.run("alter table table3 set tblproperties('custom.property'='custom.value')")
.run("drop table renamed_table1")
.dumpWithCommand("repl dump " + primaryDbName + " with ('hive.repl.dump.metadata.only'='true')"
);
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] { "table2", "table3", "table4" })
.run("desc table3")
.verifyResults(new String[] {
"a \tstring \t ",
"j \tint \t "
})
.run("desc table2")
.verifyResults(new String[] {
"a \tint \t ",
"city \tstring \t ",
"country \tstring \t ",
"zipcode \tint \t ",
"\t \t ",
"# Partition Information\t \t ",
"# col_name \tdata_type \tcomment ",
"country \tstring \t ",
})
.run("show tblproperties table3('custom.property')")
.verifyResults(new String[] {
"custom.property\tcustom.value"
});
}
@Test
public void testNonReplDBMetadataReplication() throws Throwable {
String dbName = primaryDbName + "_metadata";
WarehouseInstance.Tuple tuple = primary
.run("create database " + dbName)
.run("use " + dbName)
.run("create table table1 (i int, j int)")
.run("create table table2 (a int, city string) partitioned by (country string)")
.run("create table table3 (i int, j int)")
.run("insert into table1 values (1,2)")
.dump(dbName, Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
replica.load(replicatedDbName, dbName)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[]{"table1", "table2", "table3"})
.run("select * from table1")
.verifyResults(Collections.emptyList());
tuple = primary
.run("use " + dbName)
.run("alter table table1 rename to renamed_table1")
.run("insert into table2 partition(country='india') values (1,'mumbai') ")
.run("create table table4 (i int, j int)")
.dump(dbName, Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
replica.load(replicatedDbName, dbName)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] { "renamed_table1", "table2", "table3", "table4" })
.run("select * from renamed_table1")
.verifyResults(Collections.emptyList())
.run("select * from table2")
.verifyResults(Collections.emptyList());
}
@Test
public void testBootStrapDumpOfWarehouse() throws Throwable {
//Clear the repl base dir
Path replBootstrapDumpDir = new Path(primary.hiveConf.get(MetastoreConf.ConfVars.REPLDIR.getHiveName()), "*");
replBootstrapDumpDir.getFileSystem(primary.hiveConf).delete(replBootstrapDumpDir, true);
String randomOne = RandomStringUtils.random(10, true, false);
String randomTwo = RandomStringUtils.random(10, true, false);
String dbOne = primaryDbName + randomOne;
String dbTwo = primaryDbName + randomTwo;
primary.run("alter database default set dbproperties ('repl.source.for' = '1, 2, 3')");
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create table t1 (i int, j int)")
.run("create database " + dbOne + " WITH DBPROPERTIES ( '" +
SOURCE_OF_REPLICATION + "' = '1,2,3')")
.run("use " + dbOne)
// TODO: this is wrong; this test sets up dummy txn manager and so it cannot create ACID tables.
// This used to work by accident, now this works due a test flag. The test needs to be fixed.
// Also applies for a couple more tests.
.run("create table t1 (i int, j int) partitioned by (load_date date) "
+ "clustered by(i) into 2 buckets stored as orc tblproperties ('transactional'='true') ")
.run("create database " + dbTwo + " WITH DBPROPERTIES ( '" +
SOURCE_OF_REPLICATION + "' = '1,2,3')")
.run("use " + dbTwo)
.run("create table t1 (i int, j int)")
.dump("`*`", Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
/*
Due to the limitation that we can only have one instance of Persistence Manager Factory in a JVM
we are not able to create multiple embedded derby instances for two different MetaStore instances.
*/
primary.run("drop database " + primaryDbName + " cascade");
primary.run("drop database " + dbOne + " cascade");
primary.run("drop database " + dbTwo + " cascade");
/*
End of additional steps
*/
// Reset ckpt and last repl ID keys to empty set for allowing bootstrap load
replica.run("show databases")
.verifyFailure(new String[] { primaryDbName, dbOne, dbTwo })
.run("alter database default set dbproperties ('hive.repl.ckpt.key'='', 'repl.last.id'='')");
try {
replica.load("", "`*`");
} catch (SemanticException e) {
assertEquals("REPL LOAD * is not supported", e.getMessage());
}
}
@Test
public void testReplLoadFromSourceUsingWithClause() throws Throwable {
HiveConf replicaConf = replica.getConf();
List<String> withConfigs = Arrays.asList(
"'hive.metastore.warehouse.dir'='" + replicaConf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE) + "'",
"'hive.metastore.uris'='" + replicaConf.getVar(HiveConf.ConfVars.METASTOREURIS) + "'",
"'hive.repl.replica.functions.root.dir'='" + replicaConf.getVar(HiveConf.ConfVars.REPL_FUNCTIONS_ROOT_DIR) + "'");
//////////// Bootstrap ////////////
WarehouseInstance.Tuple bootstrapTuple = primary
.run("use " + primaryDbName)
.run("create table table1 (i int)")
.run("create table table2 (id int) partitioned by (country string)")
.run("insert into table1 values (1)")
.dump(primaryDbName);
// Run load on primary itself
primary.load(replicatedDbName, primaryDbName, withConfigs)
.status(replicatedDbName, withConfigs)
.verifyResult(bootstrapTuple.lastReplicationId);
replica.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] { "table1", "table2" })
.run("select * from table1")
.verifyResults(new String[]{ "1" })
.verifyReplTargetProperty(replicatedDbName);
//////////// First Incremental ////////////
WarehouseInstance.Tuple incrementalOneTuple = primary
.run("use " + primaryDbName)
.run("alter table table1 rename to renamed_table1")
.run("insert into table2 partition(country='india') values (1) ")
.run("insert into table2 partition(country='usa') values (2) ")
.run("create table table3 (i int)")
.run("insert into table3 values(10)")
.run("create function " + primaryDbName
+ ".testFunctionOne as 'hivemall.tools.string.StopwordUDF' "
+ "using jar 'ivy://io.github.myui:hivemall:0.4.0-2'")
.dump(primaryDbName, Collections.emptyList());
// Run load on primary itself
primary.load(replicatedDbName, primaryDbName, withConfigs)
.status(replicatedDbName, withConfigs)
.verifyResult(incrementalOneTuple.lastReplicationId);
replica.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] { "renamed_table1", "table2", "table3" })
.run("select * from renamed_table1")
.verifyResults(new String[] { "1" })
.run("select id from table2 order by id")
.verifyResults(new String[] { "1", "2" })
.run("select * from table3")
.verifyResults(new String[] { "10" })
.run("show functions like '" + replicatedDbName + "%'")
.verifyResult(replicatedDbName + ".testFunctionOne")
.verifyReplTargetProperty(replicatedDbName);
//////////// Second Incremental ////////////
WarehouseInstance.Tuple secondIncremental = primary
.run("use " + primaryDbName)
.run("alter table table2 add columns (zipcode int)")
.run("alter table table3 set tblproperties('custom.property'='custom.value')")
.run("drop table renamed_table1")
.run("alter table table2 drop partition(country='usa')")
.run("truncate table table3")
.run("drop function " + primaryDbName + ".testFunctionOne ")
.dump(primaryDbName, Collections.emptyList());
// Run load on primary itself
primary.load(replicatedDbName, primaryDbName, withConfigs)
.status(replicatedDbName, withConfigs)
.verifyResult(secondIncremental.lastReplicationId);
replica.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] { "table2", "table3"})
.run("desc table2")
.verifyResults(new String[] {
"id \tint \t ",
"country \tstring \t ",
"zipcode \tint \t ",
"\t \t ",
"# Partition Information\t \t ",
"# col_name \tdata_type \tcomment ",
"country \tstring \t ",
})
.run("show tblproperties table3('custom.property')")
.verifyResults(new String[] { "custom.property\tcustom.value" })
.run("select id from table2 order by id")
.verifyResults(new String[] { "1" })
.run("select * from table3")
.verifyResults(Collections.emptyList())
.run("show functions like '" + replicatedDbName + "%'")
.verifyResult(null)
.verifyReplTargetProperty(replicatedDbName);
}
@Test
public void testIncrementalReplWithEventsBatchHavingDropCreateTable() throws Throwable {
// Bootstrap dump with empty db
WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName);
// Bootstrap load in replica
replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(bootstrapTuple.lastReplicationId);
// First incremental dump
WarehouseInstance.Tuple firstIncremental = primary.run("use " + primaryDbName)
.run("create table table1 (i int)")
.run("create table table2 (id int) partitioned by (country string)")
.run("insert into table1 values (1)")
.run("insert into table2 partition(country='india') values(1)")
.dump(primaryDbName, Collections.emptyList());
// First incremental load
replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(firstIncremental.lastReplicationId)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] {"table1", "table2"})
.run("select * from table1")
.verifyResults(new String[] {"1"})
.run("select id from table2 order by id")
.verifyResults(new String[] {"1"});
// Second incremental dump
WarehouseInstance.Tuple secondIncremental = primary.run("use " + primaryDbName)
.run("drop table table1")
.run("drop table table2")
.run("create table table2 (id int) partitioned by (country string)")
.run("alter table table2 add partition(country='india')")
.run("alter table table2 drop partition(country='india')")
.run("insert into table2 partition(country='us') values(2)")
.run("create table table1 (i int)")
.run("insert into table1 values (2)")
.dump(primaryDbName, Collections.emptyList());
// Second incremental load
replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(secondIncremental.lastReplicationId)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] {"table1", "table2"})
.run("select * from table1")
.verifyResults(new String[] {"2"})
.run("select id from table2 order by id")
.verifyResults(new String[] {"2"});
}
@Test
public void testIncrementalReplWithDropAndCreateTableDifferentPartitionTypeAndInsert() throws Throwable {
// Bootstrap dump with empty db
WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName);
// Bootstrap load in replica
replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(bootstrapTuple.lastReplicationId);
// First incremental dump
WarehouseInstance.Tuple firstIncremental = primary.run("use " + primaryDbName)
.run("create table table1 (id int) partitioned by (country string)")
.run("create table table2 (id int)")
.run("create table table3 (id int) partitioned by (country string)")
.run("insert into table1 partition(country='india') values(1)")
.run("insert into table2 values(2)")
.run("insert into table3 partition(country='india') values(3)")
.dump(primaryDbName, Collections.emptyList());
// First incremental load
replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(firstIncremental.lastReplicationId)
.run("use " + replicatedDbName)
.run("select id from table1")
.verifyResults(new String[] {"1"})
.run("select * from table2")
.verifyResults(new String[] {"2"})
.run("select id from table3")
.verifyResults(new String[] {"3"});
// Second incremental dump
WarehouseInstance.Tuple secondIncremental = primary.run("use " + primaryDbName)
.run("drop table table1")
.run("drop table table2")
.run("drop table table3")
.run("create table table1 (id int)")
.run("insert into table1 values (10)")
.run("create table table2 (id int) partitioned by (country string)")
.run("insert into table2 partition(country='india') values(20)")
.run("create table table3 (id int) partitioned by (name string, rank int)")
.run("insert into table3 partition(name='adam', rank=100) values(30)")
.dump(primaryDbName, Collections.emptyList());
// Second incremental load
replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(secondIncremental.lastReplicationId)
.run("use " + replicatedDbName)
.run("select * from table1")
.verifyResults(new String[] { "10" })
.run("select id from table2")
.verifyResults(new String[] { "20" })
.run("select id from table3")
.verifyResults(new String[] {"30"});
}
@Test
public void testShouldNotCreateDirectoryForNonNativeTableInDumpDirectory() throws Throwable {
String createTableQuery =
"CREATE TABLE custom_serdes( serde_id bigint COMMENT 'from deserializer', name string "
+ "COMMENT 'from deserializer', slib string COMMENT 'from deserializer') "
+ "ROW FORMAT SERDE 'org.apache.hive.storage.jdbc.JdbcSerDe' "
+ "STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' "
+ "WITH SERDEPROPERTIES ('serialization.format'='1') "
+ "TBLPROPERTIES ( "
+ "'hive.sql.database.type'='METASTORE', "
+ "'hive.sql.query'='SELECT \"SERDE_ID\", \"NAME\", \"SLIB\" FROM \"SERDES\"')";
WarehouseInstance.Tuple bootstrapTuple = primary
.run("use " + primaryDbName)
.run(createTableQuery).dump(primaryDbName);
Path cSerdesTableDumpLocation = new Path(
new Path(bootstrapTuple.dumpLocation, primaryDbName),
"custom_serdes");
FileSystem fs = cSerdesTableDumpLocation.getFileSystem(primary.hiveConf);
assertFalse(fs.exists(cSerdesTableDumpLocation));
}
@Test
public void testShouldDumpMetaDataForNonNativeTableIfSetMeataDataOnly() throws Throwable {
String tableName = testName.getMethodName() + "_table";
String createTableQuery =
"CREATE TABLE " + tableName + " ( serde_id bigint COMMENT 'from deserializer', name string "
+ "COMMENT 'from deserializer', slib string COMMENT 'from deserializer') "
+ "ROW FORMAT SERDE 'org.apache.hive.storage.jdbc.JdbcSerDe' "
+ "STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' "
+ "WITH SERDEPROPERTIES ('serialization.format'='1') "
+ "TBLPROPERTIES ( "
+ "'hive.sql.database.type'='METASTORE', "
+ "'hive.sql.query'='SELECT \"SERDE_ID\", \"NAME\", \"SLIB\" FROM \"SERDES\"')";
WarehouseInstance.Tuple bootstrapTuple = primary
.run("use " + primaryDbName)
.run(createTableQuery)
.dump(primaryDbName, Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
// Bootstrap load in replica
replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(bootstrapTuple.lastReplicationId)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResult(tableName);
}
private void verifyIfCkptSet(Map<String, String> props, String dumpDir) {
assertTrue(props.containsKey(ReplUtils.REPL_CHECKPOINT_KEY));
String hiveDumpDir = dumpDir + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
assertTrue(props.get(ReplUtils.REPL_CHECKPOINT_KEY).equals(hiveDumpDir));
}
private void verifyIfCkptPropMissing(Map<String, String> props) {
assertFalse(props.containsKey(ReplUtils.REPL_CHECKPOINT_KEY));
}
private void verifyIfSrcOfReplPropMissing(Map<String, String> props) {
assertFalse(props.containsKey(SOURCE_OF_REPLICATION));
}
@Test
public void testIncrementalDumpEmptyDumpDirectory() throws Throwable {
WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(tuple.lastReplicationId);
tuple = primary.dump(primaryDbName, Collections.emptyList());
replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(tuple.lastReplicationId);
// create events for some other database and then dump the primaryDbName to dump an empty directory.
String testDbName = primaryDbName + "_test";
tuple = primary.run(" create database " + testDbName)
.run("create table " + testDbName + ".tbl (fld int)")
.dump(primaryDbName, Collections.emptyList());
// Incremental load to existing database with empty dump directory should set the repl id to the last event at src.
replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(tuple.lastReplicationId);
// Bootstrap load from an empty dump directory should return empty load directory error.
tuple = primary.dump("someJunkDB", Collections.emptyList());
try {
replica.runCommand("REPL LOAD someJunkDB into someJunkDB");
assert false;
} catch (CommandProcessorException e) {
assertTrue(e.getMessage().toLowerCase().contains("semanticException no data to load in path".toLowerCase()));
}
primary.run(" drop database if exists " + testDbName + " cascade");
}
@Test
public void testIncrementalDumpMultiIteration() throws Throwable {
WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(bootstrapTuple.lastReplicationId);
WarehouseInstance.Tuple incremental = primary.run("use " + primaryDbName)
.run("create table table1 (id int) partitioned by (country string)")
.run("create table table2 (id int)")
.run("create table table3 (id int) partitioned by (country string)")
.run("insert into table1 partition(country='india') values(1)")
.run("insert into table2 values(2)")
.run("insert into table3 partition(country='india') values(3)")
.dump(primaryDbName, Collections.emptyList());
replica.load(replicatedDbName, primaryDbName,
Collections.singletonList("'hive.repl.approx.max.load.tasks'='10'"))
.status(replicatedDbName)
.verifyResult(incremental.lastReplicationId)
.run("use " + replicatedDbName)
.run("select id from table1")
.verifyResults(new String[] {"1" })
.run("select * from table2")
.verifyResults(new String[] {"2" })
.run("select id from table3")
.verifyResults(new String[] {"3" });
assert(IncrementalLoadTasksBuilder.getNumIteration() > 1);
incremental = primary.run("use " + primaryDbName)
.run("create table table5 (key int, value int) partitioned by (load_date date) " +
"clustered by(key) into 2 buckets stored as orc")
.run("create table table4 (i int, j int)")
.run("insert into table4 values (1,2)")
.dump(primaryDbName, Collections.singletonList("'hive.repl.include.external.tables'='false'"));
String hiveDumpDir = incremental.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
Path path = new Path(hiveDumpDir);
FileSystem fs = path.getFileSystem(conf);
FileStatus[] fileStatus = fs.listStatus(path);
int numEvents = fileStatus.length - 3; //for _metadata, _finished_dump and _events_dump
replica.load(replicatedDbName, primaryDbName,
Arrays.asList("'hive.repl.approx.max.load.tasks'='1','hive.repl.include.external.tables'='false'"))
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] {"table1", "table2", "table3", "table4", "table5" })
.run("select i from table4")
.verifyResult("1");
Assert.assertEquals(IncrementalLoadTasksBuilder.getNumIteration(), numEvents);
}
@Test
public void testIfCkptAndSourceOfReplPropsIgnoredByReplDump() throws Throwable {
WarehouseInstance.Tuple tuplePrimary = primary
.run("use " + primaryDbName)
.run("create table t1 (place string) partitioned by (country string) "
+ " tblproperties('custom.property'='custom.value')")
.run("insert into table t1 partition(country='india') values ('bangalore')")
.dump(primaryDbName);
// Bootstrap Repl A -> B
replica.load(replicatedDbName, primaryDbName)
.run("repl status " + replicatedDbName)
.verifyResult(tuplePrimary.lastReplicationId)
.run("show tblproperties t1('custom.property')")
.verifyResults(new String[] { "custom.property\tcustom.value" })
.dumpFailure(replicatedDbName)
.run("alter database " + replicatedDbName
+ " set dbproperties ('" + SOURCE_OF_REPLICATION + "' = '1, 2, 3')")
.dumpFailure(replicatedDbName); //can not dump the db before first successful incremental load is done.
// do a empty incremental load to allow dump of replicatedDbName
WarehouseInstance.Tuple temp = primary.dump(primaryDbName, Collections.emptyList());
replica.load(replicatedDbName, primaryDbName); // first successful incremental load.
// Bootstrap Repl B -> C
WarehouseInstance.Tuple tupleReplica = replica.run("alter database " + replicatedDbName
+ " set dbproperties ('" + TARGET_OF_REPLICATION + "' = '')").dump(replicatedDbName);
String replDbFromReplica = replicatedDbName + "_dupe";
replica.load(replDbFromReplica, replicatedDbName)
.run("use " + replDbFromReplica)
.run("repl status " + replDbFromReplica)
.verifyResult(tupleReplica.lastReplicationId)
.run("show tables")
.verifyResults(new String[] { "t1" })
.run("select country from t1")
.verifyResults(Arrays.asList("india"))
.run("show tblproperties t1('custom.property')")
.verifyResults(new String[] { "custom.property\tcustom.value" });
// Check if DB/table/partition in C doesn't have repl.source.for props. Also ensure, ckpt property
// is set to bootstrap dump location used in C.
Database db = replica.getDatabase(replDbFromReplica);
verifyIfSrcOfReplPropMissing(db.getParameters());
verifyIfCkptSet(db.getParameters(), tupleReplica.dumpLocation);
Table t1 = replica.getTable(replDbFromReplica, "t1");
verifyIfCkptSet(t1.getParameters(), tupleReplica.dumpLocation);
Partition india = replica.getPartition(replDbFromReplica, "t1", Collections.singletonList("india"));
verifyIfCkptSet(india.getParameters(), tupleReplica.dumpLocation);
// Perform alters in A for incremental replication
WarehouseInstance.Tuple tuplePrimaryInc = primary.run("use " + primaryDbName)
.run("alter database " + primaryDbName + " set dbproperties('dummy_key'='dummy_val')")
.run("alter table t1 set tblproperties('dummy_key'='dummy_val')")
.run("alter table t1 partition(country='india') set fileformat orc")
.dump(primaryDbName, Collections.emptyList());
// Incremental Repl A -> B with alters on db/table/partition
WarehouseInstance.Tuple tupleReplicaInc = replica.load(replicatedDbName, primaryDbName)
.run("repl status " + replicatedDbName)
.verifyResult(tuplePrimaryInc.lastReplicationId)
.dump(replicatedDbName, Collections.emptyList());
// Check if DB in B have ckpt property is set to bootstrap dump location used in B and missing for table/partition.
db = replica.getDatabase(replicatedDbName);
verifyIfCkptSet(db.getParameters(), tuplePrimary.dumpLocation);
t1 = replica.getTable(replicatedDbName, "t1");
verifyIfCkptPropMissing(t1.getParameters());
india = replica.getPartition(replicatedDbName, "t1", Collections.singletonList("india"));
verifyIfCkptPropMissing(india.getParameters());
// Incremental Repl B -> C with alters on db/table/partition
replica.load(replDbFromReplica, replicatedDbName)
.run("use " + replDbFromReplica)
.run("repl status " + replDbFromReplica)
.verifyResult(tupleReplicaInc.lastReplicationId)
.run("show tblproperties t1('custom.property')")
.verifyResults(new String[] { "custom.property\tcustom.value" });
// Check if DB/table/partition in C doesn't have repl.source.for props. Also ensure, ckpt property
// in DB is set to bootstrap dump location used in C but for table/partition, it is missing.
db = replica.getDatabase(replDbFromReplica);
verifyIfCkptSet(db.getParameters(), tupleReplica.dumpLocation);
verifyIfSrcOfReplPropMissing(db.getParameters());
t1 = replica.getTable(replDbFromReplica, "t1");
verifyIfCkptPropMissing(t1.getParameters());
india = replica.getPartition(replDbFromReplica, "t1", Collections.singletonList("india"));
verifyIfCkptPropMissing(india.getParameters());
replica.run("drop database if exists " + replDbFromReplica + " cascade");
}
@Test
public void testIfCkptPropIgnoredByExport() throws Throwable {
WarehouseInstance.Tuple tuplePrimary = primary
.run("use " + primaryDbName)
.run("create table t1 (place string) partitioned by (country string)")
.run("insert into table t1 partition(country='india') values ('bangalore')")
.dump(primaryDbName);
// Bootstrap Repl A -> B and then export table t1
String path = "hdfs:///tmp/" + replicatedDbName + "/";
String exportPath = "'" + path + "1/'";
replica.load(replicatedDbName, primaryDbName)
.run("repl status " + replicatedDbName)
.verifyResult(tuplePrimary.lastReplicationId)
.run("use " + replicatedDbName)
.run("export table t1 to " + exportPath);
// Check if ckpt property set in table/partition in B after bootstrap load.
Table t1 = replica.getTable(replicatedDbName, "t1");
verifyIfCkptSet(t1.getParameters(), tuplePrimary.dumpLocation);
Partition india = replica.getPartition(replicatedDbName, "t1", Collections.singletonList("india"));
verifyIfCkptSet(india.getParameters(), tuplePrimary.dumpLocation);
// Import table t1 to C
String importDbFromReplica = replicatedDbName + "_dupe";
replica.run("create database " + importDbFromReplica)
.run("use " + importDbFromReplica)
.run("import table t1 from " + exportPath)
.run("select country from t1")
.verifyResults(Collections.singletonList("india"));
// Check if table/partition in C doesn't have ckpt property
t1 = replica.getTable(importDbFromReplica, "t1");
verifyIfCkptPropMissing(t1.getParameters());
india = replica.getPartition(importDbFromReplica, "t1", Collections.singletonList("india"));
verifyIfCkptPropMissing(india.getParameters());
replica.run("drop database if exists " + importDbFromReplica + " cascade");
}
@Test
public void testIfBootstrapReplLoadFailWhenRetryAfterBootstrapComplete() throws Throwable {
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create table t1 (id int)")
.run("insert into table t1 values (10)")
.run("create table t2 (place string) partitioned by (country string)")
.run("insert into table t2 partition(country='india') values ('bangalore')")
.run("insert into table t2 partition(country='uk') values ('london')")
.run("insert into table t2 partition(country='us') values ('sfo')")
.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("show tables")
.verifyResults(new String[] { "t1", "t2" })
.run("select id from t1")
.verifyResults(Collections.singletonList("10"))
.run("select country from t2 order by country")
.verifyResults(Arrays.asList("india", "uk", "us"));
String hiveDumpLocation = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
replica.verifyIfCkptSet(replicatedDbName, hiveDumpLocation);
// To retry with same dump delete the load ack
new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(
hiveDumpLocation, LOAD_ACKNOWLEDGEMENT.toString()), true);
// Retry with same dump with which it was already loaded also fails.
replica.loadFailure(replicatedDbName, primaryDbName);
// To retry with same dump delete the load ack
new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(
hiveDumpLocation, LOAD_ACKNOWLEDGEMENT.toString()), true);
// Retry from same dump when the database is empty is also not allowed.
replica.run("drop table t1")
.run("drop table t2")
.loadFailure(replicatedDbName, primaryDbName);
}
@Test
public void testBootstrapReplLoadRetryAfterFailureForTablesAndConstraints() throws Throwable {
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create table t1(a string, b string, primary key (a, b) disable novalidate rely)")
.run("create table t2(a string, b string, foreign key (a, b) references t1(a, b) disable novalidate)")
.run("create table t3(a string, b string not null disable, unique (a) disable)")
.dump(primaryDbName);
// Need to drop the primary DB as metastore is shared by both primary/replica. So, constraints
// conflict when loaded. Some issue with framework which needs to be relook into later.
primary.run("drop database if exists " + primaryDbName + " cascade");
// Allow create table only on t1. Create should fail for rest of the tables and hence constraints
// also not loaded.
BehaviourInjection<CallerArguments, Boolean> callerVerifier
= new BehaviourInjection<CallerArguments, Boolean>() {
@Override
public Boolean apply(CallerArguments args) {
injectionPathCalled = true;
if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.constraintTblName != null)) {
LOG.warn("Verifier - DB: " + String.valueOf(args.dbName)
+ " Constraint Table: " + String.valueOf(args.constraintTblName));
return false;
}
if (args.tblName != null) {
LOG.warn("Verifier - Table: " + String.valueOf(args.tblName));
return args.tblName.equals("t1");
}
return true;
}
};
InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
// Trigger bootstrap dump which just creates table t1 and other tables (t2, t3) and constraints not loaded.
List<String> withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'");
try {
replica.loadFailure(replicatedDbName, primaryDbName, withConfigs);
callerVerifier.assertInjectionsPerformed(true, false);
} finally {
InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
}
replica.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.verifyResult("null");
assertEquals(0, replica.getPrimaryKeyList(replicatedDbName, "t1").size());
assertEquals(0, replica.getUniqueConstraintList(replicatedDbName, "t3").size());
assertEquals(0, replica.getNotNullConstraintList(replicatedDbName, "t3").size());
assertEquals(0, replica.getForeignKeyList(replicatedDbName, "t2").size());
// Verify if create table is not called on table t1 but called for t2 and t3.
// Also, allow constraint creation only on t1 and t3. Foreign key creation on t2 fails.
callerVerifier = new BehaviourInjection<CallerArguments, Boolean>() {
@Override
public Boolean apply(CallerArguments args) {
injectionPathCalled = true;
if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.funcName != null)) {
LOG.warn("Verifier - DB: " + String.valueOf(args.dbName) + " Func: " + String.valueOf(args.funcName));
return false;
}
if (args.constraintTblName != null) {
LOG.warn("Verifier - Constraint Table: " + String.valueOf(args.constraintTblName));
return (args.constraintTblName.equals("t1") || args.constraintTblName.equals("t3"));
}
return true;
}
};
InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
try {
// Retry with same dump with which it was already loaded should resume the bootstrap load.
// This time, it fails when try to load the foreign key constraints. All other constraints are loaded.
replica.loadFailure(replicatedDbName, primaryDbName, withConfigs);
callerVerifier.assertInjectionsPerformed(true, false);
} finally {
InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
}
replica.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.verifyResult("null")
.run("show tables")
.verifyResults(new String[] { "t1", "t2", "t3" });
assertEquals(2, replica.getPrimaryKeyList(replicatedDbName, "t1").size());
assertEquals(1, replica.getUniqueConstraintList(replicatedDbName, "t3").size());
assertEquals(1, replica.getNotNullConstraintList(replicatedDbName, "t3").size());
assertEquals(0, replica.getForeignKeyList(replicatedDbName, "t2").size());
// Verify if no create table/function calls. Only add foreign key constraints on table t2.
callerVerifier = new BehaviourInjection<CallerArguments, Boolean>() {
@Override
public Boolean apply(CallerArguments args) {
injectionPathCalled = true;
if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.tblName != null)) {
LOG.warn("Verifier - DB: " + String.valueOf(args.dbName)
+ " Table: " + String.valueOf(args.tblName));
return false;
}
if (args.constraintTblName != null) {
LOG.warn("Verifier - Constraint Table: " + String.valueOf(args.constraintTblName));
return args.constraintTblName.equals("t2");
}
return true;
}
};
InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
try {
// Retry with same dump with which it was already loaded should resume the bootstrap load.
// This time, it completes by adding just foreign key constraints for table t2.
replica.load(replicatedDbName, primaryDbName);
callerVerifier.assertInjectionsPerformed(true, false);
} finally {
InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
}
replica.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("show tables")
.verifyResults(new String[] { "t1", "t2", "t3" });
assertEquals(2, replica.getPrimaryKeyList(replicatedDbName, "t1").size());
assertEquals(1, replica.getUniqueConstraintList(replicatedDbName, "t3").size());
assertEquals(1, replica.getNotNullConstraintList(replicatedDbName, "t3").size());
assertEquals(2, replica.getForeignKeyList(replicatedDbName, "t2").size());
}
@Test
public void testBootstrapReplLoadRetryAfterFailureForPartitions() throws Throwable {
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create table t2 (place string) partitioned by (country string)")
.run("insert into table t2 partition(country='india') values ('bangalore')")
.run("insert into table t2 partition(country='uk') values ('london')")
.run("insert into table t2 partition(country='us') values ('sfo')")
.run("CREATE FUNCTION " + primaryDbName
+ ".testFunctionOne as 'hivemall.tools.string.StopwordUDF' "
+ "using jar 'ivy://io.github.myui:hivemall:0.4.0-2'")
.dump(primaryDbName);
// Inject a behavior where REPL LOAD failed when try to load table "t2" and partition "uk".
// So, table "t2" will exist and partition "india" will exist, rest failed as operation failed.
BehaviourInjection<List<Partition>, Boolean> alterPartitionStub
= new BehaviourInjection<List<Partition>, Boolean>() {
@Override
public Boolean apply(List<Partition> ptns) {
for (Partition ptn : ptns) {
if (ptn.getValues().get(0).equals("india")) {
injectionPathCalled = true;
LOG.warn("####getPartition Stub called");
return false;
}
}
return true;
}
};
InjectableBehaviourObjectStore.setAlterPartitionsBehaviour(alterPartitionStub);
// Make sure that there's some order in which the objects are loaded.
List<String> withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'",
"'hive.in.repl.test.files.sorted'='true'");
replica.loadFailure(replicatedDbName, primaryDbName, withConfigs);
InjectableBehaviourObjectStore.setAlterPartitionsBehaviour(null); // reset the behaviour
alterPartitionStub.assertInjectionsPerformed(true, false);
replica.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.verifyResult("null")
.run("show tables")
.verifyResults(new String[] {"t2" })
.run("select country from t2 order by country")
.verifyResults(Collections.singletonList("india"));
// Verify if no create table calls. Add partitions and create function calls expected.
BehaviourInjection<CallerArguments, Boolean> callerVerifier
= new BehaviourInjection<CallerArguments, Boolean>() {
@Nullable
@Override
public Boolean apply(@Nullable CallerArguments args) {
if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.tblName != null)) {
injectionPathCalled = true;
LOG.warn("Verifier - DB: " + String.valueOf(args.dbName)
+ " Table: " + String.valueOf(args.tblName));
return false;
}
return true;
}
};
InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
try {
// Retry with same dump with which it was already loaded should resume the bootstrap load.
// This time, it completes by adding remaining partitions and function.
replica.load(replicatedDbName, primaryDbName);
callerVerifier.assertInjectionsPerformed(false, false);
} finally {
InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
}
replica.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("show tables")
.verifyResults(new String[] { "t2" })
.run("select country from t2 order by country")
.verifyResults(Arrays.asList("india", "uk", "us"))
.run("show functions like '" + replicatedDbName + "%'")
.verifyResult(replicatedDbName + ".testFunctionOne");
}
@Test
public void testMoveOptimizationBootstrapReplLoadRetryAfterFailure() throws Throwable {
String replicatedDbName_CM = replicatedDbName + "_CM";
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create table t2 (place string) partitioned by (country string)")
.run("insert into table t2 partition(country='india') values ('bangalore')")
.dump(primaryDbName);
testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t2",
"ADD_PARTITION", tuple);
}
@Test
public void testMoveOptimizationIncrementalFailureAfterCopyReplace() throws Throwable {
List<String> withConfigs =
Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
String replicatedDbName_CM = replicatedDbName + "_CM";
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create table t2 (place string) partitioned by (country string)")
.run("insert into table t2 partition(country='india') values ('bangalore')")
.run("create table t1 (place string) partitioned by (country string)")
.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName, withConfigs);
//delete load ack to reuse the dump
new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(tuple.dumpLocation
+ Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR
+ LOAD_ACKNOWLEDGEMENT.toString()), true);
replica.load(replicatedDbName_CM, primaryDbName, withConfigs);
replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')")
.run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')");
tuple = primary.run("use " + primaryDbName)
.run("insert overwrite table t1 select * from t2")
.dump(primaryDbName, Collections.emptyList());
testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t1", "ADD_PARTITION",
tuple);
}
@Test
public void testMoveOptimizationIncrementalFailureAfterCopy() throws Throwable {
List<String> withConfigs =
Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
String replicatedDbName_CM = replicatedDbName + "_CM";
WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
.run("create table t2 (place string) partitioned by (country string)")
.run("ALTER TABLE t2 ADD PARTITION (country='india')")
.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName, withConfigs);
//delete load ack to reuse the dump
new Path(bootstrapDump.dumpLocation).getFileSystem(conf).delete(new Path(bootstrapDump.dumpLocation
+ Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR + LOAD_ACKNOWLEDGEMENT.toString()), true);
replica.load(replicatedDbName_CM, primaryDbName, withConfigs);
replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')")
.run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')");
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("insert into table t2 partition(country='india') values ('bangalore')")
.dump(primaryDbName, Collections.emptyList());
testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t2", "INSERT", tuple);
}
private void testMoveOptimization(String primaryDb, String replicaDb, String replicatedDbName_CM,
String tbl, String eventType, WarehouseInstance.Tuple tuple) throws Throwable {
List<String> withConfigs =
Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
// fail add notification for given event type.
BehaviourInjection<NotificationEvent, Boolean> callerVerifier
= new BehaviourInjection<NotificationEvent, Boolean>() {
@Override
public Boolean apply(NotificationEvent entry) {
if (entry.getEventType().equalsIgnoreCase(eventType) && entry.getTableName().equalsIgnoreCase(tbl)) {
injectionPathCalled = true;
LOG.warn("Verifier - DB: " + String.valueOf(entry.getDbName())
+ " Table: " + String.valueOf(entry.getTableName())
+ " Event: " + String.valueOf(entry.getEventType()));
return false;
}
return true;
}
};
InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
try {
replica.loadFailure(replicaDb, primaryDbName, withConfigs);
} finally {
InjectableBehaviourObjectStore.resetAddNotificationModifier();
}
callerVerifier.assertInjectionsPerformed(true, false);
replica.load(replicaDb, primaryDbName, withConfigs);
replica.run("use " + replicaDb)
.run("select country from " + tbl + " where country == 'india'")
.verifyResults(Arrays.asList("india"));
primary.run("use " + primaryDb)
.run("drop table " + tbl);
//delete load ack to reuse the dump
new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(tuple.dumpLocation
+ Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR
+ LOAD_ACKNOWLEDGEMENT.toString()), true);
InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
try {
replica.loadFailure(replicatedDbName_CM, primaryDbName, withConfigs);
} finally {
InjectableBehaviourObjectStore.resetAddNotificationModifier();
}
callerVerifier.assertInjectionsPerformed(true, false);
replica.load(replicatedDbName_CM, primaryDbName, withConfigs);
replica.run("use " + replicatedDbName_CM)
.run("select country from " + tbl + " where country == 'india'")
.verifyResults(Arrays.asList("india"))
.run(" drop database if exists " + replicatedDbName_CM + " cascade");
}
// This requires the tables are loaded in a fixed sorted order.
@Test
public void testBootstrapLoadRetryAfterFailureForAlterTable() throws Throwable {
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create table t1 (place string)")
.run("insert into table t1 values ('testCheck')")
.run("create table t2 (place string) partitioned by (country string)")
.run("insert into table t2 partition(country='china') values ('shenzhen')")
.run("insert into table t2 partition(country='india') values ('banaglore')")
.dump(primaryDbName);
// fail setting ckpt directory property for table t1.
BehaviourInjection<CallerArguments, Boolean> callerVerifier
= new BehaviourInjection<CallerArguments, Boolean>() {
@Nullable
@Override
public Boolean apply(@Nullable CallerArguments args) {
if (args.tblName.equalsIgnoreCase("t1") && args.dbName.equalsIgnoreCase(replicatedDbName)) {
injectionPathCalled = true;
LOG.warn("Verifier - DB : " + args.dbName + " TABLE : " + args.tblName);
return false;
}
return true;
}
};
// Fail repl load before the ckpt proeprty is set for t1 and after it is set for t2. So in the next run, for
// t2 it goes directly to partion load with no task for table tracker and for t1 it loads the table
// again from start.
InjectableBehaviourObjectStore.setAlterTableModifier(callerVerifier);
try {
replica.loadFailure(replicatedDbName, primaryDbName);
callerVerifier.assertInjectionsPerformed(true, false);
} finally {
InjectableBehaviourObjectStore.resetAlterTableModifier();
}
// Retry with same dump with which it was already loaded should resume the bootstrap load. Make sure that table t1,
// is loaded before t2. So that scope is set to table in first iteration for table t1. In the next iteration, it
// loads only remaining partitions of t2, so that the table tracker has no tasks.
List<String> withConfigs = Arrays.asList("'hive.in.repl.test.files.sorted'='true'");
replica.load(replicatedDbName, primaryDbName, withConfigs);
replica.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("select country from t2 order by country")
.verifyResults(Arrays.asList("china", "india"));
}
/*
Can't test complete replication as mini ranger is not supported
Testing just the configs and no impact on existing replication
*/
@Test
public void testRangerReplication() throws Throwable {
List<String> clause = Arrays.asList("'hive.repl.include.authorization.metadata'='true'",
"'hive.in.test'='true'");
primary.run("use " + primaryDbName)
.run("create table acid_table (key int, value int) partitioned by (load_date date) " +
"clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
.run("create table table1 (i String)")
.run("insert into table1 values (1)")
.run("insert into table1 values (2)")
.dump(primaryDbName, clause);
replica.load(replicatedDbName, primaryDbName, clause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] {"acid_table", "table1"})
.run("select * from table1")
.verifyResults(new String[] {"1", "2"});
}
@Test
public void testHdfsNameserviceLazyCopy() throws Throwable {
List<String> clause = getHdfsNameserviceClause();
clause.add("'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='true'");
primary.run("use " + primaryDbName)
.run("create table acid_table (key int, value int) partitioned by (load_date date) " +
"clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
.run("create table table1 (i int)")
.run("insert into table1 values (1)")
.run("insert into table1 values (2)")
.run("create external table ext_table1 (id int)")
.run("insert into ext_table1 values (3)")
.run("insert into ext_table1 values (4)")
.dump(primaryDbName, clause);
try{
replica.load(replicatedDbName, primaryDbName, clause);
Assert.fail("Expected the UnknownHostException to be thrown.");
} catch (IllegalArgumentException ex) {
assertTrue(ex.getMessage().contains("java.net.UnknownHostException: nsRemote"));
}
}
@Test
public void testHdfsNameserviceLazyCopyIncr() throws Throwable {
List<String> clause = getHdfsNameserviceClause();
primary.run("use " + primaryDbName)
.run("create table acid_table (key int, value int) partitioned by (load_date date) " +
"clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
.run("create table table1 (i String)")
.run("insert into table1 values (1)")
.run("insert into table1 values (2)")
.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName, clause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] {"acid_table", "table1"})
.run("select * from table1")
.verifyResults(new String[] {"1", "2"});
primary.run("use " + primaryDbName)
.run("insert into table1 values (3)")
.dump(primaryDbName, clause);
try{
replica.load(replicatedDbName, primaryDbName, clause);
Assert.fail("Expected the UnknownHostException to be thrown.");
} catch (IllegalArgumentException ex) {
assertTrue(ex.getMessage().contains("java.net.UnknownHostException: nsRemote"));
}
}
@Test
public void testHdfsNameserviceWithDataCopy() throws Throwable {
List<String> clause = getHdfsNameserviceClause();
//NS replacement parameters has no effect when data is also copied to staging
clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='false'");
primary.run("use " + primaryDbName)
.run("create table acid_table (key int, value int) partitioned by (load_date date) " +
"clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
.run("create table table1 (i String)")
.run("insert into table1 values (1)")
.run("insert into table1 values (2)")
.dump(primaryDbName, clause);
replica.load(replicatedDbName, primaryDbName, clause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] {"acid_table", "table1"})
.run("select * from table1")
.verifyResults(new String[] {"1", "2"});
primary.run("use " + primaryDbName)
.run("insert into table1 values (3)")
.dump(primaryDbName, clause);
replica.load(replicatedDbName, primaryDbName, clause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] {"acid_table", "table1"})
.run("select * from table1")
.verifyResults(new String[] {"1", "2", "3"});
}
@Test
public void testCreateFunctionWithHdfsNameservice() throws Throwable {
Path identityUdfLocalPath = new Path("../../data/files/identity_udf.jar");
Path identityUdf1HdfsPath = new Path(primary.functionsRoot, "idFunc1" + File.separator + "identity_udf1.jar");
setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf1HdfsPath);
List<String> clause = getHdfsNameserviceClause();
primary.run("CREATE FUNCTION " + primaryDbName
+ ".idFunc1 as 'IdentityStringUDF' "
+ "using jar '" + identityUdf1HdfsPath.toString() + "'");
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, clause);
try{
replica.load(replicatedDbName, primaryDbName, clause);
Assert.fail("Expected the UnknownHostException to be thrown.");
} catch (IllegalArgumentException ex) {
assertTrue(ex.getMessage().contains("java.net.UnknownHostException: nsRemote"));
}
}
@Test
public void testRangerReplicationRetryExhausted() throws Throwable {
List<String> clause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA + "'='true'",
"'" + HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY + "'='1s'", "'" + HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION
+ "'='30s'", "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL + "'='false'", "'" + HiveConf.ConfVars.HIVE_IN_TEST
+ "'='false'");
List<String> testClause = Arrays.asList("'hive.repl.include.authorization.metadata'='true'",
"'hive.in.test'='true'");
try {
primary.run("use " + primaryDbName)
.run("create table acid_table (key int, value int) partitioned by (load_date date) " +
"clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
.run("create table table1 (i String)")
.run("insert into table1 values (1)")
.run("insert into table1 values (2)")
.dump(primaryDbName, clause);
Assert.fail();
} catch (Exception e) {
Assert.assertEquals(ErrorMsg.REPL_RETRY_EXHAUSTED.getErrorCode(),
ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode());
}
//This is now non recoverable error
try {
primary.dump(primaryDbName, clause);
Assert.fail();
} catch (Exception e) {
assertEquals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(),
ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode());
}
//Delete non recoverable marker to fix this
Path baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR));
Path nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName);
Assert.assertNotNull(nonRecoverablePath);
baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true);
//This should pass as non recoverable marker removed and valid configs present.
primary.dump(primaryDbName, testClause);
}
/*
Can't test complete replication as mini ranger is not supported
Testing just the configs and no impact on existing replication
*/
@Test
public void testFailureUnsupportedAuthorizerReplication() throws Throwable {
List<String> clause = Arrays.asList("'hive.repl.include.authorization.metadata'='true'",
"'hive.in.test'='true'", "'hive.repl.authorization.provider.service'='sentry'");
primary.run("use " + primaryDbName)
.run("create table acid_table (key int, value int) partitioned by (load_date date) " +
"clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
.run("create table table1 (i String)")
.run("insert into table1 values (1)")
.run("insert into table1 values (2)");
try {
primary.dump(primaryDbName, clause);
Assert.fail();
} catch (SemanticException e) {
assertEquals("Invalid config error : Authorizer sentry not supported for replication " +
"for ranger service.", e.getMessage());
assertEquals(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.getErrorCode(),
ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode());
}
//This is now non recoverable error
try {
primary.dump(primaryDbName, clause);
Assert.fail();
} catch (Exception e) {
assertEquals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(),
ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode());
}
try {
replica.load(replicatedDbName, primaryDbName, clause);
Assert.fail();
} catch (Exception e) {
assertEquals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(),
ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode());
}
//Delete non recoverable marker to fix this
Path baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR));
Path nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName);
Assert.assertNotNull(nonRecoverablePath);
baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true);
Assert.assertFalse(baseDumpDir.getFileSystem(primary.hiveConf).exists(nonRecoverablePath));
//This should pass as non recoverable marker removed and valid configs present.
WarehouseInstance.Tuple dump = primary.dump(primaryDbName);
String stackTrace = null;
try {
replica.load(replicatedDbName, primaryDbName, clause);
} catch (Exception e) {
Assert.assertEquals(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.getErrorCode(),
ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode());
stackTrace = ExceptionUtils.getStackTrace(e);
}
//This is now non recoverable error
try {
replica.load(replicatedDbName, primaryDbName, clause);
Assert.fail();
} catch (Exception e) {
assertEquals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(),
ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode());
}
try {
replica.dump(primaryDbName, clause);
Assert.fail();
} catch (Exception e) {
assertEquals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(),
ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode());
}
nonRecoverablePath = new Path(dump.dumpLocation, NON_RECOVERABLE_MARKER.toString());
Assert.assertNotNull(nonRecoverablePath);
//check non recoverable stack trace
String actualStackTrace = readStackTrace(nonRecoverablePath, primary.hiveConf);
Assert.assertEquals(stackTrace, actualStackTrace);
//Delete non recoverable marker to fix this
baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true);
//This should pass now
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] {"acid_table", "table1"})
.run("select * from table1")
.verifyResults(new String[] {"1", "2"});
}
private String readStackTrace(Path nonRecoverablePath, HiveConf conf) {
try {
FileSystem fs = FileSystem.get(conf);
FSDataInputStream in = fs.open(nonRecoverablePath);
BufferedReader bufferedReader = new BufferedReader(
new InputStreamReader(in, StandardCharsets.UTF_8));
String line = null;
StringBuilder builder = new StringBuilder();
while ((line=bufferedReader.readLine())!=null){
builder.append(line);
builder.append("\n");
}
try {
in.close();
} catch (IOException e) {
//Ignore
}
return builder.toString();
} catch (IOException e) {
return null;
}
}
private Path getNonRecoverablePath(Path dumpDir, String dbName) throws IOException {
Path dumpPath = new Path(dumpDir,
Base64.getEncoder().encodeToString(dbName.toLowerCase()
.getBytes(StandardCharsets.UTF_8.name())));
FileSystem fs = dumpPath.getFileSystem(conf);
if (fs.exists(dumpPath)) {
FileStatus[] statuses = fs.listStatus(dumpPath);
if (statuses.length > 0) {
return new Path(statuses[0].getPath(), NON_RECOVERABLE_MARKER.toString());
}
}
return null;
}
//Testing just the configs and no impact on existing replication
@Test
public void testAtlasReplication() throws Throwable {
Map<String, String> confMap = defaultAtlasConfMap();
primary.run("use " + primaryDbName)
.run("create table acid_table (key int, value int) partitioned by (load_date date) " +
"clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
.run("create table table1 (i String)")
.run("insert into table1 values (1)")
.run("insert into table1 values (2)")
.dump(primaryDbName, getAtlasClause(defaultAtlasConfMap()));
verifyAtlasMetadataPresent();
confMap.remove("hive.repl.atlas.replicatedto");
replica.load(replicatedDbName, primaryDbName, getAtlasClause(confMap))
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] {"acid_table", "table1"})
.run("select * from table1")
.verifyResults(new String[] {"1", "2"});
}
@Test
public void testAtlasMissingConfigs() throws Throwable {
primary.run("use " + primaryDbName)
.run("create table acid_table (key int, value int) partitioned by (load_date date) " +
"clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
.run("create table table1 (i String)")
.run("insert into table1 values (1)")
.run("insert into table1 values (2)");
Map<String, String> confMap = new HashMap<>();
confMap.put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true");
confMap.put(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA.varname, "true");
ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, true);
ensureFailedAdminRepl(getAtlasClause(confMap), true);
//Delete non recoverable marker to fix this
Path baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR));
Path nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName);
Assert.assertNotNull(nonRecoverablePath);
baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true);
confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "InvalidURL:atlas");
ensureInvalidUrl(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, true);
confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "http://localhost:21000/atlas");
ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, true);
ensureFailedAdminRepl(getAtlasClause(confMap), true);
//Delete non recoverable marker to fix this
baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR));
nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName);
Assert.assertNotNull(nonRecoverablePath);
baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true);
confMap.put(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, replicatedDbName);
ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, true);
ensureFailedAdminRepl(getAtlasClause(confMap), true);
//Delete non recoverable marker to fix this
baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR));
nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName);
Assert.assertNotNull(nonRecoverablePath);
baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true);
confMap.put(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, "cluster0");
ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, true);
ensureFailedAdminRepl(getAtlasClause(confMap), true);
//Delete non recoverable marker to fix this
baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR));
nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName);
Assert.assertNotNull(nonRecoverablePath);
baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true);
confMap.put(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, "cluster1");
primary.dump(primaryDbName, getAtlasClause(confMap));
verifyAtlasMetadataPresent();
confMap.clear();
confMap.put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true");
confMap.put(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA.varname, "true");
ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, false);
ensureFailedAdminRepl(getAtlasClause(confMap), false);
//Delete non recoverable marker to fix this
baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR));
nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName);
Assert.assertNotNull(nonRecoverablePath);
baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true);
confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "InvalidURL:atlas");
ensureInvalidUrl(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, false);
confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "http://localhost:21000/atlas");
ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, false);
ensureFailedAdminRepl(getAtlasClause(confMap), false);
//Delete non recoverable marker to fix this
baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR));
nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName);
Assert.assertNotNull(nonRecoverablePath);
baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true);
confMap.put(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, "cluster0");
ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, false);
ensureFailedAdminRepl(getAtlasClause(confMap), false);
//Delete non recoverable marker to fix this
baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR));
nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName);
Assert.assertNotNull(nonRecoverablePath);
baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true);
confMap.put(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, "cluster1");
primary.load(replicatedDbName, primaryDbName, getAtlasClause(confMap));
}
private void ensureFailedAdminRepl(List<String> clause, boolean dump) throws Throwable {
try {
if (dump) {
primary.dump(primaryDbName, clause);
} else {
primary.load(replicatedDbName, primaryDbName, clause);
}
Assert.fail();
} catch (SemanticException e) {
assertEquals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(),
ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode());
}
}
private void ensureInvalidUrl(List<String> atlasClause, String endpoint, boolean dump) throws Throwable {
try {
if (dump) {
primary.dump(primaryDbName, atlasClause);
} else {
primary.load(replicatedDbName, primaryDbName, atlasClause);
}
} catch (MalformedURLException e) {
return;
}
Assert.fail("Atlas endpoint is invalid and but test didn't fail:" + endpoint);
}
private void verifyAtlasMetadataPresent() throws IOException {
Path dbReplDir = new Path(primary.repldDir,
Base64.getEncoder().encodeToString(primaryDbName.toLowerCase().getBytes(StandardCharsets.UTF_8.name())));
FileSystem fs = FileSystem.get(dbReplDir.toUri(), primary.getConf());
assertTrue(fs.exists(dbReplDir));
FileStatus[] dumpRoots = fs.listStatus(dbReplDir);
assert(dumpRoots.length == 1);
Path dumpRoot = dumpRoots[0].getPath();
assertTrue("Hive dump root doesn't exist", fs.exists(new Path(dumpRoot, ReplUtils.REPL_HIVE_BASE_DIR)));
Path atlasDumpRoot = new Path(dumpRoot, ReplUtils.REPL_ATLAS_BASE_DIR);
assertTrue("Atlas dump root doesn't exist", fs.exists(atlasDumpRoot));
assertTrue("Atlas export file doesn't exist",
fs.exists(new Path(atlasDumpRoot, ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME)));
assertTrue("Atlas dump metadata doesn't exist",
fs.exists(new Path(atlasDumpRoot, EximUtil.METADATA_NAME)));
BufferedReader br = null;
try {
br = new BufferedReader(new InputStreamReader(
fs.open(new Path(atlasDumpRoot, EximUtil.METADATA_NAME)), Charset.defaultCharset()));
String[] lineContents = br.readLine().split("\t", 5);
assertEquals(primary.hiveConf.get("fs.defaultFS"), lineContents[0]);
assertEquals(0, Long.parseLong(lineContents[1]));
} finally {
if (br != null) {
br.close();
}
}
}
private void ensureFailedReplOperation(List<String> clause, String conf, boolean dump) throws Throwable {
try {
if (dump) {
primary.dump(primaryDbName, clause);
} else {
primary.load(replicatedDbName, primaryDbName, clause);
}
Assert.fail(conf + " is mandatory config for Atlas metadata replication but it didn't fail.");
} catch (SemanticException e) {
assertEquals(e.getMessage(), ("Invalid config error : " + conf
+ " is mandatory config for Atlas metadata replication for atlas service."));
}
}
private Map<String, String> defaultAtlasConfMap() {
Map<String, String> confMap = new HashMap<>();
confMap.put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true");
confMap.put(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA.varname, "true");
confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "http://localhost:21000/atlas");
confMap.put(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, replicatedDbName);
confMap.put(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, "cluster0");
confMap.put(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, "cluster1");
return confMap;
}
private List<String> getAtlasClause(Map<String, String> confMap) {
List confList = new ArrayList();
for (Map.Entry<String, String> entry:confMap.entrySet()) {
confList.add(quote(entry.getKey()) + "=" + quote(entry.getValue()));
}
return confList;
}
private String quote(String str) {
return "'" + str + "'";
}
private void setupUDFJarOnHDFS(Path identityUdfLocalPath, Path identityUdfHdfsPath) throws IOException {
FileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.copyFromLocalFile(identityUdfLocalPath, identityUdfHdfsPath);
}
private List<String> getHdfsNameserviceClause() {
List<String> withClause = new ArrayList<>();
withClause.add("'" + HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE.varname + "'='true'");
withClause.add("'" + HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE_NAME.varname + "'='"
+ NS_REMOTE + "'");
return withClause;
}
}