blob: 2a8d93d79ef0d9a68894ca03276e96f6eec046b2 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hive.ql.parse;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.apache.hadoop.hive.ql.exec.repl.ReplAck;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.StringAppender;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertTrue;
public class TestReplicationIncrementalCheckpointing extends BaseReplicationAcrossInstances {
String extraPrimaryDb;
private LoggerConfig loggerConfig;
private LoggerContext ctx;
private Level oldLevel;
private StringAppender appender;
private Logger logger;
public static void classLevelSetup() throws Exception {
HashMap<String, String> overrides = new HashMap<>();
overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true");
overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "false");
overrides.put(HiveConf.ConfVars.HIVE_EXTERNALTABLE_PURGE_DEFAULT.varname, "true");
internalBeforeClassSetup(overrides, TestReplicationScenarios.class);
public void setup() throws Throwable {
logger = LogManager.getLogger("hive.ql.metadata.Hive");
oldLevel = logger.getLevel();
ctx = (LoggerContext) LogManager.getContext(false);
Configuration config = ctx.getConfiguration();
loggerConfig = config.getLoggerConfig(logger.getName());
appender = StringAppender.createStringAppender("%m");
appender.addToLogger(logger.getName(), Level.DEBUG);
extraPrimaryDb = "extra_" + primaryDbName;
public void tearDown() throws Throwable {"drop database if exists " + extraPrimaryDb + " cascade");
public void testDropEventReplayExternalTable() throws Throwable {
public void testDropEventReplayManagedTable() throws Throwable {
public void testDropEventReplay(String tableType) throws Throwable {
// Create an empty table and do a dump & load.
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create " + tableType + " table t1 (id int)")
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.run("repl status " + replicatedDbName)
// Insert some data and drop the table, do a dump & load cycle post that
tuple = primary
.run("use " + primaryDbName)
.run("insert into table t1 values(1),(2)")
.run("drop table t1")
// Change the repl id in the database to one before the drop partition event.
int replId = Integer.parseInt(tuple.lastReplicationId) - 1;
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyFailure(new String[] {"t1"})
.run("repl status " + replicatedDbName)
.run("alter database "+ replicatedDbName + " set DBPROPERTIES(''='" + replId + "')");
// Delete the load ACK.
// Reset the logger
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyFailure(new String[] {"t1"})
.run("repl status " + replicatedDbName)
// Check the log if the event got replayed.
assertTrue(appender.getOutput(), appender.getOutput().contains("Loading event EVENT_DROP_TABLE"));
// Create table with same name again and dump, to see drop event replay doesn't bother anything further.
tuple = primary
.run("use " + primaryDbName)
.run("create " + tableType + " table t1 (id int)")
.run("insert into table t1 values(1),(2)")
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResults(new String[] {"t1"})
.run("select id from t1")
.verifyResults(new String[] {"1", "2"})
.run("repl status " + replicatedDbName)
// Add some more data and delete the table & then try checkpointing.
tuple = primary
.run("use " + primaryDbName)
.run("insert into table t1 values(3)")
.run("drop table t1")
// Change repl id as one above drop table.
replId = Integer.parseInt(tuple.lastReplicationId) - 1;
// Check whether the table is deleted
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyFailure(new String[] {"t1"})
.run("repl status " + replicatedDbName)
.run("alter database "+ replicatedDbName + " set DBPROPERTIES(''='" + replId + "')");
// Delete the load ACK.
// Reset the logger
// Check whether the table stays deleted and no failure.
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyFailure(new String[] {"t1"})
.run("repl status " + replicatedDbName)
// Check the log if the event got replayed.
assertTrue(appender.getOutput(), appender.getOutput().contains("Loading event EVENT_DROP_TABLE"));
public void testCreateEventReplayExternalTable() throws Throwable {
public void testCreateEventReplayManagedTable() throws Throwable {
public void testCreateEventReplay(String tableType) throws Throwable {
// Do an empty dump & load to move to incremental mode.
WarehouseInstance.Tuple tuple = primary
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
// Create a table and do a dump & load.
tuple = primary
.run("use " + primaryDbName)
.run("create " + tableType + " table t1 (id int)")
int replId = Integer.parseInt(tuple.lastReplicationId) - 1;
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResults(new String[] {"t1"})
.run("repl status " + replicatedDbName)
.run("alter database "+ replicatedDbName + " set DBPROPERTIES(''='" + replId + "')");
// Delete the load ACK.
// Reset the logger
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResults(new String[] {"t1"});
assertTrue(appender.getOutput(), appender.getOutput().contains("Loading event EVENT_CREATE_TABLE"));
// Drop & create a table with different schema.
tuple = primary
.run("use " + primaryDbName)
.run("drop table t1")
.run("create " + tableType + " table t1 (name string)")
replId = Integer.parseInt(tuple.lastReplicationId) - 1;
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResults(new String[] {"t1"})
.run("repl status " + replicatedDbName)
.run("alter database "+ replicatedDbName + " set DBPROPERTIES(''='" + replId + "')");
// Delete the load ACK.
// Reset the logger
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResults(new String[] {"t1"});
assertTrue(appender.getOutput(), appender.getOutput().contains("Loading event EVENT_CREATE_TABLE"));
public void testInsertEventReplay() throws Throwable {
// Do an empty dump & load to move to incremental mode.
WarehouseInstance.Tuple tuple = primary
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
// Create a table and do add two insert statements dump & load.
tuple = primary
.run("use " + primaryDbName)
.run("create table t1 (id int)")
.run("insert into table t1 values(1),(2)")
int replId = Integer.parseInt(tuple.lastReplicationId) - 3;
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select id from t1")
.verifyResults(new String[] {"1", "2"})
.run("repl status " + replicatedDbName)
.run("alter database "+ replicatedDbName + " set DBPROPERTIES(''='" + replId + "')");
// Delete the load ACK.
// Reset the logger
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select id from t1")
.verifyResults(new String[] {"1", "2"});
assertTrue(appender.getOutput(), appender.getOutput().contains("Loading event EVENT_INSERT"));
// Drop & create a table with different schema.
tuple = primary
.run("use " + primaryDbName)
.run("drop table t1")
.run("create table t1 (name string)")
.run("insert into table t1 values ('one'),('two')")
replId = Integer.parseInt(tuple.lastReplicationId) - 3;
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select name from t1")
.verifyResults(new String[] {"one", "two"})
.run("repl status " + replicatedDbName)
.run("alter database "+ replicatedDbName + " set DBPROPERTIES(''='" + replId + "')");
// Delete the load ACK.
// Reset the logger
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select name from t1")
.verifyResults(new String[] {"one", "two"});
assertTrue(appender.getOutput(), appender.getOutput().contains("Loading event EVENT_INSERT"));
public void testAlterTableReplayExternalTable() throws Throwable {
public void testAlterTableReplayManagedTable() throws Throwable {
public void testAlterTableReplay(String tableType) throws Throwable {
// create a partitioned table
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create " + tableType +" table t1 (id int)")
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
// Do an alter table operation.
tuple = primary
.run("use " + primaryDbName)
.run("alter table t1 set tblproperties('dummy_key'='dummy_val')")
int replId = Integer.parseInt(tuple.lastReplicationId) - 1;
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tblproperties t1('dummy_key')")
.verifyResults(new String[] { "dummy_key\tdummy_val" })
.run("alter database "+ replicatedDbName + " set DBPROPERTIES(''='" + replId + "')");
// Delete the load ACK.
// Reset the logger
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tblproperties t1('dummy_key')")
.verifyResults(new String[] { "dummy_key\tdummy_val" });
assertTrue(appender.getOutput(), appender.getOutput().contains("Loading event EVENT_ALTER_TABLE"));
// Try Alter table rename.
tuple = primary
.run("use " + primaryDbName)
.run("alter table t1 RENAME TO t1_renamed")
replId = Integer.parseInt(tuple.lastReplicationId) - 1;
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyFailure(new String[] {"t1"})
.run("show tables like 't1_renamed'")
.verifyResults(new String[] {"t1_renamed"})
.run("repl status " + replicatedDbName)
.run("alter database "+ replicatedDbName + " set DBPROPERTIES(''='" + replId + "')");
// Delete the load ACK.
// Reset the logger
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyFailure(new String[] {"t1"})
.run("show tables like 't1_renamed'")
.verifyResults(new String[] {"t1_renamed"});
assertTrue(appender.getOutput(), appender.getOutput().contains("Loading event EVENT_RENAME_TABLE"));
// Test truncate table.
// Add some data and do a dump & load to the table
.run("use " + primaryDbName)
.run("insert into table t1_renamed values (1),(2),(3),(4)")
// Add some data and do a dump & load to the table
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select id from t1_renamed")
.verifyResults(new String[] {"1", "2", "3", "4"});
// Truncate the table
tuple = primary
.run("use " + primaryDbName)
.run("truncate table t1_renamed")
replId = Integer.parseInt(tuple.lastReplicationId) - 1;
replica.load(replicatedDbName, primaryDbName)
.run("show tables like 't1_renamed'")
.verifyResults(new String[] {"t1_renamed"})
.run("use " + replicatedDbName)
.run("select id from t1_renamed")
.verifyFailure(new String[] {"1", "2", "3", "4"})
.run("alter database "+ replicatedDbName + " set DBPROPERTIES(''='" + replId + "')");
// Delete the load ACK.
// Reset the logger
replica.load(replicatedDbName, primaryDbName)
.run("show tables like 't1_renamed'")
.verifyResults(new String[] {"t1_renamed"})
.run("use " + replicatedDbName)
.run("select id from t1_renamed")
.verifyFailure(new String[] {"1", "2", "3", "4"});
public void testAddPartitionReplay() throws Throwable {
String tableType = "";
// create a partitioned table and do a dump & load.
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create " + tableType +" table t1 (place string) partitioned by (country string)")
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
// Do a add partition operation.
tuple = primary
.run("use " + primaryDbName)
.run("alter table t1 add partition(country='india')")
.run("show partitions t1")
.verifyResults(new String[] {"country=india"})
int replId = Integer.parseInt(tuple.lastReplicationId) - 1;
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show partitions t1")
.verifyResults(new String[] {"country=india" })
.run("alter database "+ replicatedDbName + " set DBPROPERTIES(''='" + replId + "')");
// Delete the load ACK.
// Reset the logger
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show partitions t1")
.verifyResults(new String[] {"country=india" });
assertTrue(appender.getOutput(), appender.getOutput().contains("Loading event EVENT_ADD_PARTITION"));
public void testDropPartitionReplay() throws Throwable {
String tableType = "";
// create a partitioned table and do a dump & load, with a partition
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create " + tableType +" table t1 (place string) partitioned by (country string)")
.run("alter table t1 add partition(country='india')")
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.run("show partitions t1")
.verifyResults(new String[] {"country=india" });
// Do a drop partition operation.
tuple = primary
.run("use " + primaryDbName)
.run("alter table t1 drop partition(country='india')")
.run("show partitions t1")
.verifyFailure(new String[] {"country=india"})
int replId = Integer.parseInt(tuple.lastReplicationId) - 1;
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show partitions t1")
.verifyFailure(new String[] {"country=india" })
.run("alter database "+ replicatedDbName + " set DBPROPERTIES(''='" + replId + "')");
// Delete the load ACK.
// Reset the logger
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show partitions t1")
.verifyFailure(new String[] {"country=india"});
assertTrue(appender.getOutput(), appender.getOutput().contains("Loading event EVENT_DROP_PARTITION"));
public void testAlterPartitionReplay() throws Throwable {
String tableType = "";
// create a partitioned table and do a dump & load, with a partition
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create " + tableType +" table t1 (place string) partitioned by (country string)")
.run("alter table t1 add partition(country='india')")
String whLocation = tableType.isEmpty() ? primary.getDatabase(primaryDbName).getManagedLocationUri() : primary
Path whPath = new Path(whLocation);
Path newPartPath = new Path(whPath, "newLoc");
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.run("show partitions t1")
.verifyResults(new String[] {"country=india" });
// Do an alter partition operation to change location of the table.
if (!tableType.isEmpty()) {
tuple ="use " + primaryDbName)
.run("alter table t1 partition(country='india') set location '" + newPartPath + "'")
int replId = Integer.parseInt(tuple.lastReplicationId) - 1;
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName).run("show partitions t1")
.verifyResults(new String[] { "country=india" })
.run("alter database " + replicatedDbName + " set DBPROPERTIES(''='" + replId + "')");
assertTrue(replica.getPartition(replicatedDbName, "t1", Collections.singletonList("india")).getSd().getLocation()
// Delete the load ACK.
// Reset the logger
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show partitions t1")
.verifyResults(new String[] { "country=india" });
assertTrue(replica.getPartition(replicatedDbName, "t1", Collections.singletonList("india")).getSd().getLocation()
assertTrue(appender.getOutput(), appender.getOutput().contains("Loading event EVENT_ALTER_PARTITION"));
// Do a rename partition and see if things stay same.
tuple ="use " + primaryDbName)
.run("alter table t1 partition (country='india') rename to partition(country='usa')")
int replId = Integer.parseInt(tuple.lastReplicationId) - 1;
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName).run("show partitions t1")
.run("show partitions t1")
.verifyResults(new String[] { "country=usa" })
.run("alter database " + replicatedDbName + " set DBPROPERTIES(''='" + replId + "')");
// Delete the load ACK.
// Reset the logger
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName).run("show partitions t1")
.run("show partitions t1")
.verifyResults(new String[] { "country=usa" });
assertTrue(appender.getOutput(), appender.getOutput().contains("Loading event EVENT_RENAME_PARTITION"));
private void deleteLoadAck(WarehouseInstance.Tuple tuple) throws IOException {
Path hiveDumpDir = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
Path loadAck = new Path(hiveDumpDir, ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf);
fs.delete(loadAck, true);