blob: 4cd0541cf2f65be5adc2baa3f3cf2db88df38e49 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.parse;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.apache.hadoop.hive.ql.exec.repl.ReplicationMigrationTool;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintStream;
import java.util.HashMap;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestReplicationMigrationTool extends BaseReplicationAcrossInstances {
String extraPrimaryDb;
private final ByteArrayOutputStream out = new ByteArrayOutputStream();
private final ByteArrayOutputStream err = new ByteArrayOutputStream();
@BeforeClass
public static void classLevelSetup() throws Exception {
HashMap<String, String> overrides = new HashMap<>();
overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
GzipJSONMessageEncoder.class.getCanonicalName());
overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true");
overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
UserGroupInformation.getCurrentUser().getUserName());
overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "false");
internalBeforeClassSetup(overrides, TestReplicationScenarios.class);
}
@Before
public void setup() throws Throwable {
redirectStream();
super.setup();
extraPrimaryDb = "extra_" + primaryDbName;
}
@After
public void tearDown() throws Throwable {
primary.run("drop database if exists " + extraPrimaryDb + " cascade");
super.tearDown();
resetStream();
}
private void redirectStream() {
System.setOut(new PrintStream(out));
System.setErr(new PrintStream(err));
}
private void resetStream() {
out.reset();
err.reset();
}
@Test
public void testSuccessfulSync() throws Throwable {
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
// Create one normal table with 3 files, 1 partitioned table & one empty table.
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName).run("create external table table1 (id int)")
.run("insert into table table1 values (100)").run("insert into table table1 values (200),(300),(400)")
.run("insert into table table1 values (500),(600)")
.run("create external table table2 (place string) partitioned by (country string)")
.run("insert into table table2 partition(country='india') values ('chennai')")
.run("insert into table table2 partition(country='india') values ('jaipur'),('udaipur'),('kota')")
.run("insert into table table2 partition(country='us') values ('new york')")
.run("insert into table table2 partition(country='france') values ('paris')")
.run("create external table table3 (id int) partitioned by (department string)")
.dump(primaryDbName, withClause);
verifySuccessfulResult(tuple);
replica.load(replicatedDbName, primaryDbName, withClause).run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId).verifyReplTargetProperty(replicatedDbName);
verifySuccessfulResult(tuple);
// Insert some more data and new tables, and check the verification is success after incremental as well.
tuple =
primary.run("use " + primaryDbName).run("create external table table4 (id int) partitioned by (name string)")
.run("insert into table table1 values (2000)").run("insert into table table1 values (880),(660),(440)")
.run("insert into table table2 partition(country='nepal') values ('kathmandu'),('pokhra')").run(
"insert into table table2 partition(country='india') values ('lucknow'),('kanpur'),('agra'),('varanasi'),"
+ "('delhi')").run("insert into table table3 partition(department='engg') values (10),(40),(50),(60)")
.run("insert into table table3 partition(department='support') values (20),(350)")
.run("insert into table table3 partition(department='support') values (20),(350)")
.run("insert into table table4 partition(name='abc') values (120),(3150)").dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause).run("use " + replicatedDbName)
.verifyReplTargetProperty(replicatedDbName);
verifySuccessfulResult(tuple);
}
@Test
public void testExtraAndMissingFilesAtSource() throws Throwable {
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
Path externalTableLocation = new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + "a/");
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.mkdirs(externalTableLocation, new FsPermission("777"));
//externally add data to location
try (FSDataOutputStream outputStream = fs.create(new Path(externalTableLocation, "file1.txt"))) {
outputStream.write("1,2\n".getBytes());
outputStream.write("13,21\n".getBytes());
}
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create external table tablea (i int, j int) row format delimited fields terminated by ',' " + "location '"
+ externalTableLocation.toUri() + "'")
.run("create external table tableb (id int)")
.run("insert into table tableb values (25)")
.run("insert into table tableb values (28),(36),(42)")
.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause).run("use " + replicatedDbName)
.verifyReplTargetProperty(replicatedDbName);
verifySuccessfulResult(tuple);
//externally add one more file to location
try (FSDataOutputStream outputStream = fs.create(new Path(externalTableLocation, "file2.txt"))) {
outputStream.write("10,20\n".getBytes());
outputStream.write("15,27\n".getBytes());
}
ReplicationMigrationTool replTool = new ReplicationMigrationTool();
verifyFailures(tuple, replTool, "Directory Size mismatch", "Extra entry at source", "Extra entry at source");
// Do a dump & load cycle so that source & target are again in sync, the script should return success.
tuple = doAnEmptyDumpAndLoadCycle(withClause);
verifySuccessfulResult(tuple);
// Now delete one file at source & do a verification.
fs.delete(new Path(externalTableLocation, "file1.txt"), true);
verifyFailures(tuple, replTool, "Directory Size mismatch", "Source Or Target has an extra/less files",
"Source Or Target has an extra/less files.");
// Do a dump & load cycle so that source & target are again in sync, the script should return success.
tuple = doAnEmptyDumpAndLoadCycle(withClause);
verifySuccessfulResult(tuple);
}
@Test
public void testModifiedContentAndOpenFiles() throws Throwable {
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
Path externalTableLocationa = new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + "a/");
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.mkdirs(externalTableLocationa, new FsPermission("777"));
Path externalTableLocationb = new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + "b");
fs.mkdirs(externalTableLocationb, new FsPermission("777"));
//externally add data to location
try (FSDataOutputStream outputStream = fs.create(new Path(externalTableLocationa, "filea1.txt"))) {
outputStream.write("1,2\n".getBytes());
outputStream.write("13,21\n".getBytes());
}
try (FSDataOutputStream outputStream = fs.create(new Path(externalTableLocationb, "fileb1.txt"))) {
outputStream.write("1,2\n".getBytes());
outputStream.write("13,21\n".getBytes());
}
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName).run(
"create external table tablea (i int, j int) row format delimited fields terminated by ',' " + "location '"
+ externalTableLocationa.toUri() + "'").run(
"create external table tableb (i int, j int) row format delimited fields terminated by ',' " + "location '"
+ externalTableLocationb.toUri() + "'").run("insert into table tableb values (25,26)")
.run("insert into table tableb values (28,29),(36,37),(42,43)").dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause).run("use " + replicatedDbName)
.verifyReplTargetProperty(replicatedDbName);
// Everything is in sync, script should return success.
verifySuccessfulResult(tuple);
// Alter the content of one of the file, keeping the bytes same
try (FSDataOutputStream outputStream = fs.create(new Path(externalTableLocationb, "fileb1.txt"))) {
outputStream.write("1,4\n".getBytes());
outputStream.write("13,21\n".getBytes());
}
// All other checks shall falsely pass, the checksum verification, should catch and fail.
ReplicationMigrationTool replTool = new ReplicationMigrationTool();
validateChecksumValidationFails(replTool, tuple, "File Checksum mismatch");
// Do a dump & load cycle and check everything should be in sync.
tuple = doAnEmptyDumpAndLoadCycle(withClause);
verifySuccessfulResult(tuple);
// Modify the content of a file without keeping the number of bytes same.
try (FSDataOutputStream outputStream = fs.create(new Path(externalTableLocationa, "filea1.txt"))) {
outputStream.write("1,2\n".getBytes());
outputStream.write("13,21\n".getBytes());
outputStream.write("25,50\n".getBytes());
}
validateFileLevelValidationFails(replTool, tuple, "File Size mismatch");
validateChecksumValidationFails(replTool, tuple, "File Size mismatch");
// Do a dump & load cycle and check everything should be in sync.
tuple = doAnEmptyDumpAndLoadCycle(withClause);
verifySuccessfulResult(tuple);
// Open a file and check if it gets caught by the verify open file option.
FSDataOutputStream stream = fs.append(new Path(externalTableLocationa, "filea1.txt"));
ToolRunner.run(conf, replTool,
new String[] { "-dumpFilePath", tuple.dumpLocation, "-fileLevelCheck", "-verifyOpenFiles" });
fail("Script didn't fail despite having an open file.");
// Make sure we get the exception.
assertTrue(err.toString(), err.toString().contains("There are open files"));
err.reset();
out.reset();
// Confirm that successful message is not printed, and failure message is printed.
assertFalse(out.toString(), out.toString().contains("Completed verification. Source & Target are in Sync."));
assertTrue(out.toString(), out.toString().contains("Completed verification. Source & Target are not in Sync."));
// Close the file and check, the script should return success.
stream.close();
verifySuccessfulResult(tuple);
}
@Test
public void testFilters() throws Throwable {
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
Path externalTableLocation = new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + "a/");
DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.mkdirs(externalTableLocation, new FsPermission("777"));
//externally add data to location
try (FSDataOutputStream outputStream = fs.create(new Path(externalTableLocation, "file1.txt"))) {
outputStream.write("1,2\n".getBytes());
outputStream.write("13,21\n".getBytes());
}
// Create file & dir that needs to be filtered out.
Path random1 = new Path(externalTableLocation, "randomStuff1");
DFSTestUtil.createFile(fs, random1, 1024L, (short) 1, 1024L);
Path random2 = new Path(externalTableLocation, "randomStuff2");
DFSTestUtil.createFile(fs, random2, 1024L, (short) 1, 1024L);
// Create a filter file for DistCp
String filterFilePath = "/tmp/filter";
FileWriter myWriter = new FileWriter(filterFilePath);
myWriter.write(".*randomStuff.*");
myWriter.close();
withClause.add("'distcp.options.filters'='" + filterFilePath + "'");
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName).run(
"create external table tablea (i int, j int) row format delimited fields terminated by ',' " + "location '"
+ externalTableLocation.toUri() + "'").run("insert into table tablea values (215,226)")
.run("insert into table tablea values (281,229),(336,347),(542,453)")
.run("create external table tableb (id int)").run("insert into table tableb values (15)")
.run("insert into table tableb values (24),(26),(82)").dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause).run("use " + replicatedDbName)
.verifyReplTargetProperty(replicatedDbName);
// Check if the filtered files didn't get copied.
assertFalse(fs.exists(new Path(REPLICA_EXTERNAL_BASE, random1.toUri().getPath().replaceFirst("/", ""))));
assertFalse(fs.exists(new Path(REPLICA_EXTERNAL_BASE, random2.toUri().getPath().replaceFirst("/", ""))));
ReplicationMigrationTool replTool = new ReplicationMigrationTool();
// Verify at file level.
assertEquals(0, ToolRunner.run(conf, replTool,
new String[] { "-dumpFilePath", tuple.dumpLocation, "-fileLevelCheck", "-filters", ".*randomStuff.*" }));
assertTrue(out.toString(), out.toString().contains("Completed verification. Source & Target are in Sync."));
out.reset();
// Verify at file level, with checksum.
assertEquals(0, ToolRunner.run(conf, replTool,
new String[] { "-dumpFilePath", tuple.dumpLocation, "-fileLevelCheck", "-verifyChecksum", "-filters",
".*randomStuff.*" }));
assertTrue(out.toString(), out.toString().contains("Completed verification. Source & Target are in Sync."));
out.reset();
}
private WarehouseInstance.Tuple doAnEmptyDumpAndLoadCycle(List<String> withClause) throws Throwable {
WarehouseInstance.Tuple tuple;
tuple = primary.run("use " + primaryDbName).dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause).run("use " + replicatedDbName)
.verifyReplTargetProperty(replicatedDbName);
return tuple;
}
private void validateChecksumValidationFails(ReplicationMigrationTool replTool, WarehouseInstance.Tuple tuple,
String message) throws Exception {
ToolRunner.run(conf, replTool,
new String[] { "-dumpFilePath", tuple.dumpLocation, "-fileLevelCheck", "-verifyChecksum" });
fail("Script didn't fail despite having an extra file.");
// Make sure we get the exception.
assertTrue(err.toString(), err.toString().contains(message));
// Confirm that successful message is not printed.
assertFalse(out.toString(), out.toString().contains("Completed verification. Source & Target are in Sync."));
assertTrue(out.toString(), out.toString().contains("Completed verification. Source & Target are not in Sync."));
out.reset();
err.reset();
}
private void verifyFailures(WarehouseInstance.Tuple tuple, ReplicationMigrationTool replTool,
String dirLevelFailureMsg, String fileLevelFailureMsg, String checksumLevelFailureMsg) throws Exception {
// Verify failures at directory level.
ToolRunner.run(conf, replTool, new String[] { "-dumpFilePath", tuple.dumpLocation, "-dirLevelCheck" });
// Make sure we get the exception.
assertTrue(err.toString().contains(dirLevelFailureMsg));
// Confirm that successful message is not printed and failure message is printed.
assertFalse(out.toString(), out.toString().contains("Completed verification. Source & Target are in Sync."));
assertTrue(out.toString(), out.toString().contains("Completed verification. Source & Target are not in Sync."));
out.reset();
err.reset();
// Verify failures at file level.
validateFileLevelValidationFails(replTool, tuple, fileLevelFailureMsg);
// Verify failures at file level with checksum.
validateChecksumValidationFails(replTool, tuple, checksumLevelFailureMsg);
}
private void validateFileLevelValidationFails(ReplicationMigrationTool replTool, WarehouseInstance.Tuple tuple,
String fileLevelFailureMsg) throws Exception {
ToolRunner.run(conf, replTool, new String[] { "-dumpFilePath", tuple.dumpLocation, "-fileLevelCheck", });
// Make sure we get the exception.
assertTrue(err.toString(), err.toString().contains(fileLevelFailureMsg));
// Confirm that successful message is not printed and failure message is printed.
assertFalse(out.toString(), out.toString().contains("Completed verification. Source & Target are in Sync."));
assertTrue(out.toString(), out.toString().contains("Completed verification. Source & Target are not in Sync."));
out.reset();
err.reset();
}
private void verifySuccessfulResult(WarehouseInstance.Tuple tuple) throws Exception {
// Verify at directory level. use all three way the dumpFilePath can be specified, one full path, second with
// /hive and third just the path.
ReplicationMigrationTool replTool = new ReplicationMigrationTool();
assertEquals(0,
ToolRunner.run(conf, replTool, new String[] { "-dumpFilePath", tuple.dumpLocation +"/hive/_file_list_external", "-dirLevelCheck" }));
assertTrue(out.toString().contains("Completed verification"));
assertFalse(err.toString(), err.toString().isEmpty());
out.reset();
err.reset();
// Verify at file level.
assertEquals(0, ToolRunner
.run(conf, replTool, new String[] { "-dumpFilePath", tuple.dumpLocation + "/hive", "-fileLevelCheck" }));
assertTrue(out.toString().contains("Completed verification."));
assertFalse(err.toString(), err.toString().isEmpty());
out.reset();
err.reset();
// Verify at file level, with checksum.
assertEquals(0, ToolRunner.run(conf, replTool,
new String[] { "-dumpFilePath", tuple.dumpLocation, "-fileLevelCheck", "-verifyChecksum" }));
assertTrue(out.toString().contains("Completed verification. Source & Target are in Sync."));
assertFalse(err.toString(), err.toString().isEmpty());
out.reset();
err.reset();
}
}