blob: 7cd71e1696a9cec5c1b65767867dae4c81e1f028 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.regression.hive.dr;
import org.apache.falcon.cli.FalconCLI;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.regression.Entities.ClusterMerlin;
import org.apache.falcon.regression.Entities.RecipeMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.supportClasses.ExecResult;
import org.apache.falcon.regression.core.util.AssertUtil;
import org.apache.falcon.regression.core.supportClasses.NotifyingAssert;
import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.HiveAssert;
import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.MatrixUtil;
import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hive.hcatalog.api.HCatClient;
import org.apache.log4j.Logger;
import org.apache.oozie.client.BundleJob;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.OozieClient;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import org.testng.asserts.SoftAssert;
import java.io.IOException;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
import static org.apache.falcon.regression.core.util.HiveUtil.runSql;
import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.bootstrapCopy;
import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createExternalTable;
import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createExternalPartitionedTable;
import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createPartitionedTable;
import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createSerDeTable;
import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createVanillaTable;
/**
* Hive DR Testing.
*/
@Test(groups = {"embedded", "multiCluster"})
public class HiveDRTest extends BaseTestClass {
private static final Logger LOGGER = Logger.getLogger(HiveDRTest.class);
private static final String DB_NAME = "hdr_sdb1";
private final ColoHelper cluster = servers.get(0);
private final ColoHelper cluster2 = servers.get(1);
private final ColoHelper cluster3 = servers.get(2);
private final FileSystem clusterFS = serverFS.get(0);
private final FileSystem clusterFS2 = serverFS.get(1);
private final FileSystem clusterFS3 = serverFS.get(2);
private final OozieClient clusterOC = serverOC.get(0);
private final OozieClient clusterOC2 = serverOC.get(1);
private final OozieClient clusterOC3 = serverOC.get(2);
private final String baseTestHDFSDir = cleanAndGetTestDir() + "/HiveDR/";
private HCatClient clusterHC;
private HCatClient clusterHC2;
private RecipeMerlin recipeMerlin;
private Connection connection;
private Connection connection2;
@DataProvider
public Object[][] getRecipeLocation() {
return MatrixUtil.crossProduct(RecipeExecLocation.values());
}
private void setUp(RecipeExecLocation recipeExecLocation) throws Exception {
clusterHC = cluster.getClusterHelper().getHCatClient();
clusterHC2 = cluster2.getClusterHelper().getHCatClient();
bundles[0] = new Bundle(BundleUtil.readHCatBundle(), cluster);
bundles[1] = new Bundle(BundleUtil.readHCatBundle(), cluster2);
bundles[0].generateUniqueBundle(this);
bundles[1].generateUniqueBundle(this);
final ClusterMerlin srcCluster = bundles[0].getClusterElement();
final ClusterMerlin tgtCluster = bundles[1].getClusterElement();
String recipeDir = "HiveDrRecipe";
if (MerlinConstants.IS_SECURE) {
recipeDir = "HiveDrSecureRecipe";
}
Bundle.submitCluster(recipeExecLocation.getRecipeBundle(bundles[0], bundles[1]));
recipeMerlin = RecipeMerlin.readFromDir(recipeDir, FalconCLI.RecipeOperation.HIVE_DISASTER_RECOVERY)
.withRecipeCluster(recipeExecLocation.getRecipeCluster(srcCluster, tgtCluster));
recipeMerlin.withSourceCluster(srcCluster)
.withTargetCluster(tgtCluster)
.withFrequency(new Frequency("5", Frequency.TimeUnit.minutes))
.withValidity(TimeUtil.getTimeWrtSystemTime(-5), TimeUtil.getTimeWrtSystemTime(15));
recipeMerlin.setUniqueName(this.getClass().getSimpleName());
connection = cluster.getClusterHelper().getHiveJdbcConnection();
runSql(connection, "drop database if exists hdr_sdb1 cascade");
runSql(connection, "create database hdr_sdb1");
runSql(connection, "use hdr_sdb1");
connection2 = cluster2.getClusterHelper().getHiveJdbcConnection();
runSql(connection2, "drop database if exists hdr_sdb1 cascade");
runSql(connection2, "create database hdr_sdb1");
runSql(connection2, "use hdr_sdb1");
}
@Test(dataProvider = "getRecipeLocation")
public void drPartition(final RecipeExecLocation recipeExecLocation) throws Exception {
setUp(recipeExecLocation);
final String tblName = "partitionDR";
recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
final List<String> command = recipeMerlin.getSubmissionCommand();
runSql(connection,
"create table " + tblName + "(comment string) partitioned by (pname string)");
runSql(connection,
"insert into table " + tblName + " partition (pname = 'DELETE') values"
+ "('this partition is going to be deleted - should NOT appear after dr')");
runSql(connection,
"insert into table " + tblName + " partition (pname = 'REPLACE') values"
+ "('this partition is going to be replaced - should NOT appear after dr')");
runSql(connection,
"insert into table " + tblName + " partition (pname = 'ADD_DATA') values"
+ "('this partition will have more data - should appear after dr')");
bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
runSql(connection,
"insert into table " + tblName + " partition (pname = 'NEW_PART') values"
+ "('this partition has been added post bootstrap - should appear after dr')");
runSql(connection,
"insert into table " + tblName + " partition (pname = 'ADD_DATA') values"
+ "('more data has been added post bootstrap - should appear after dr')");
runSql(connection,
"alter table " + tblName + " drop partition(pname = 'DELETE')");
runSql(connection,
"alter table " + tblName + " drop partition(pname = 'REPLACE')");
runSql(connection,
"insert into table " + tblName + " partition (pname = 'REPLACE') values"
+ "('this partition has been replaced - should appear after dr')");
Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
InstanceUtil.waitTillInstanceReachState(recipeExecLocation.getRecipeOC(clusterOC, clusterOC2),
recipeMerlin.getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true)
).assertAll();
ExecResult execResult = cluster.getProcessHelper().getCLIMetrics(recipeMerlin.getName());
AssertUtil.assertCLIMetrics(execResult, recipeMerlin.getName(), 1, true);
}
@Test
public void drInsertOverwritePartition() throws Exception {
final RecipeExecLocation recipeExecLocation = RecipeExecLocation.SourceCluster;
setUp(recipeExecLocation);
final String tblName = "drInsertOverwritePartition";
final String hlpTblName = "drInsertOverwritePartitionHelperTbl";
recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
final List<String> command = recipeMerlin.getSubmissionCommand();
runSql(connection, "create table " + hlpTblName + "(comment string)");
runSql(connection,
"insert into table " + hlpTblName
+ " values('overwrite data - should appear after dr')");
runSql(connection,
"insert into table " + hlpTblName + " values('newdata row2 - should appear after dr')");
runSql(connection,
"insert into table " + hlpTblName + " values('newdata row1 - should appear after dr')");
runSql(connection,
"create table " + tblName + "(comment string) partitioned by (pname string)");
runSql(connection,
"insert into table " + tblName + " partition (pname = 'OLD_PART') values"
+ "('this data should be retained - should appear after dr')");
runSql(connection,
"insert into table " + tblName + " partition (pname = 'OVERWRITE_PART') values"
+ "('this data should get overwritten - should NOT appear after dr')");
bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
runSql(connection,
"insert overwrite table " + tblName + " partition (pname = 'OVERWRITE_PART') "
+ "select * from " + hlpTblName + " where comment REGEXP '^overwrite'");
runSql(connection,
"insert overwrite table " + tblName + " partition (pname = 'NEW_DATA') "
+ "select * from " + hlpTblName + " where comment REGEXP '^newdata'");
Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
InstanceUtil.waitTillInstanceReachState(recipeExecLocation.getRecipeOC(clusterOC, clusterOC2),
recipeMerlin.getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true)
).assertAll();
}
@Test
public void drTwoTablesOneRequest() throws Exception {
final RecipeExecLocation recipeExecLocation = RecipeExecLocation.TargetCluster;
setUp(recipeExecLocation);
final String tblName = "firstTableDR";
final String tbl2Name = "secondTableDR";
recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName + ',' + tbl2Name);
final List<String> command = recipeMerlin.getSubmissionCommand();
runSql(connection,
"create table " + tblName + "(comment string)");
runSql(connection,
"create table " + tbl2Name + "(comment string)");
bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
bootstrapCopy(connection, clusterFS, tbl2Name, connection2, clusterFS2, tbl2Name);
runSql(connection,
"insert into table " + tblName + " values"
+ "('this string has been added post bootstrap - should appear after dr')");
runSql(connection,
"insert into table " + tbl2Name + " values"
+ "('this string has been added post bootstrap - should appear after dr')");
Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
InstanceUtil.waitTillInstanceReachState(recipeExecLocation.getRecipeOC(clusterOC, clusterOC2),
recipeMerlin.getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
final NotifyingAssert anAssert = new NotifyingAssert(true);
HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
cluster2, clusterHC2.getTable(DB_NAME, tblName), anAssert);
HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tbl2Name),
cluster2, clusterHC2.getTable(DB_NAME, tbl2Name), anAssert);
anAssert.assertAll();
}
@Test
public void drSerDeWithProperties() throws Exception {
final RecipeExecLocation recipeExecLocation = RecipeExecLocation.SourceCluster;
setUp(recipeExecLocation);
final String tblName = "serdeTable";
recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
final List<String> command = recipeMerlin.getSubmissionCommand();
runSql(connection,
"create table " + tblName + "(comment string) "
+ "row format serde 'org.apache.hive.hcatalog.data.JsonSerDe'");
bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
runSql(connection,
"insert into table " + tblName + " values"
+ "('this string has been added post bootstrap - should appear after dr')");
runSql(connection,
"ALTER TABLE " + tblName + " SET SERDEPROPERTIES ('someProperty' = 'value')");
Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
InstanceUtil.waitTillInstanceReachState(recipeExecLocation.getRecipeOC(clusterOC, clusterOC2),
recipeMerlin.getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true)
).assertAll();
}
@Test
public void drChangeColumn() throws Exception {
final RecipeExecLocation recipeExecLocation = RecipeExecLocation.SourceCluster;
setUp(recipeExecLocation);
final String tblName = "tableForColumnChange";
recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
final List<String> command1 = recipeMerlin.getSubmissionCommand();
final String recipe1Name = recipeMerlin.getName();
runSql(connection,
"create table " + tblName + "(id int)");
bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
Assert.assertEquals(Bundle.runFalconCLI(command1), 0, "Recipe submission failed.");
runSql(connection,
"ALTER TABLE " + tblName + " CHANGE id id STRING COMMENT 'some_comment'");
InstanceUtil.waitTillInstanceReachState(recipeExecLocation.getRecipeOC(clusterOC, clusterOC2),
recipe1Name, 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true)
).assertAll();
}
@Test
public void drTwoDstTablesTwoRequests() throws Exception {
final RecipeExecLocation recipeExecLocation = RecipeExecLocation.TargetCluster;
setUp(recipeExecLocation);
final HCatClient clusterHC3 = cluster3.getClusterHelper().getHCatClient();
final Connection connection3 = cluster3.getClusterHelper().getHiveJdbcConnection();
runSql(connection3, "drop database if exists hdr_sdb1 cascade");
runSql(connection3, "create database hdr_sdb1");
runSql(connection3, "use hdr_sdb1");
final String tblName = "vanillaTable";
recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
final String recipe1Name = recipeMerlin.getName();
final List<String> command1 = recipeMerlin.getSubmissionCommand();
final Bundle bundle3 = new Bundle(BundleUtil.readHCatBundle(), cluster3);
bundle3.generateUniqueBundle(this);
bundle3.submitClusters(prism);
recipeMerlin.withTargetCluster(bundle3.getClusterElement())
.withRecipeCluster(recipeExecLocation.getRecipeCluster(
bundles[0].getClusterElement(), bundle3.getClusterElement()));
recipeMerlin.setUniqueName(this.getClass().getSimpleName());
final List<String> command2 = recipeMerlin.getSubmissionCommand();
final String recipe2Name = recipeMerlin.getName();
runSql(connection, "create table " + tblName + "(comment string)");
bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
bootstrapCopy(connection, clusterFS, tblName, connection3, clusterFS3, tblName);
runSql(connection,
"insert into table " + tblName + " values"
+ "('this string has been added post bootstrap - should appear after dr')");
Assert.assertEquals(Bundle.runFalconCLI(command1), 0, "Recipe submission failed.");
Assert.assertEquals(Bundle.runFalconCLI(command2), 0, "Recipe submission failed.");
InstanceUtil.waitTillInstanceReachState(recipeExecLocation.getRecipeOC(clusterOC, clusterOC2),
recipe1Name, 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
InstanceUtil.waitTillInstanceReachState(recipeExecLocation.getRecipeOC(clusterOC, clusterOC3),
recipe2Name, 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
final NotifyingAssert anAssert = new NotifyingAssert(true);
HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
cluster2, clusterHC2.getTable(DB_NAME, tblName), anAssert);
HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
cluster3, clusterHC3.getTable(DB_NAME, tblName), anAssert);
anAssert.assertAll();
}
@Test
public void drExternalToNonExternal() throws Exception {
final RecipeExecLocation recipeExecLocation = RecipeExecLocation.SourceCluster;
setUp(recipeExecLocation);
final String tblName = "externalToNonExternal";
recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
final List<String> command = recipeMerlin.getSubmissionCommand();
createExternalTable(connection, clusterFS, baseTestHDFSDir + "click_data/", tblName);
bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
//change column name
runSql(connection,
"alter table " + tblName + " change column data data_new string");
Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
InstanceUtil.waitTillInstanceReachState(recipeExecLocation.getRecipeOC(clusterOC, clusterOC2),
recipeMerlin.getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
final NotifyingAssert anAssert = new NotifyingAssert(true);
HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
cluster2, clusterHC2.getTable(DB_NAME, tblName), anAssert, false);
anAssert.assertNotEquals(clusterHC2.getTable(DB_NAME, tblName).getTabletype(),
clusterHC.getTable(DB_NAME, tblName).getTableName(),
"Source and destination tables should have different Tabletype");
anAssert.assertNotEquals(clusterHC2.getTable(DB_NAME, tblName).getTblProps().get("EXTERNAL"),
clusterHC.getTable(DB_NAME, tblName).getTblProps().get("EXTERNAL"),
"Source and destination tables should have different value of property EXTERNAL");
anAssert.assertAll();
}
@Test
public void drExtPartitionedToNonExtPartitioned() throws Exception {
final RecipeExecLocation recipeExecLocation = RecipeExecLocation.SourceCluster;
setUp(recipeExecLocation);
final String tblName = "extPartitionedToNonExtPartitioned";
recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
final List<String> command = recipeMerlin.getSubmissionCommand();
createExternalPartitionedTable(connection, clusterFS,
baseTestHDFSDir + "click_data/", tblName);
runSql(connection2,
"create table " + tblName + " (data string, time string) partitioned by (date_ string)");
runSql(connection2, "alter table " + tblName + " add partition "
+ "(date_='2001-01-01') location '" + baseTestHDFSDir + "click_data/2001-01-01/'");
runSql(connection2, "alter table " + tblName + " add partition "
+ "(date_='2001-01-02') location '" + baseTestHDFSDir + "click_data/2001-01-02/'");
runSql(connection2, "insert into table " + tblName + " partition (date_='2001-01-01') "
+ "values ('click1', '01:01:01')");
runSql(connection2, "insert into table " + tblName + " partition (date_='2001-01-02') "
+ "values ('click2', '02:02:02')");
final NotifyingAssert anAssert = new NotifyingAssert(true);
HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
cluster2, clusterHC2.getTable(DB_NAME, tblName), anAssert, false);
Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
//change column name
runSql(connection,
"alter table " + tblName + " change column data data_new string");
InstanceUtil.waitTillInstanceReachState(recipeExecLocation.getRecipeOC(clusterOC, clusterOC2),
recipeMerlin.getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
cluster2, clusterHC2.getTable(DB_NAME, tblName), anAssert, false);
anAssert.assertNotEquals(clusterHC2.getTable(DB_NAME, tblName).getTabletype(),
clusterHC.getTable(DB_NAME, tblName).getTableName(),
"Source and destination tables should have different Tabletype");
anAssert.assertNotEquals(clusterHC2.getTable(DB_NAME, tblName).getTblProps().get("EXTERNAL"),
clusterHC.getTable(DB_NAME, tblName).getTblProps().get("EXTERNAL"),
"Source and destination tables should have different value of property EXTERNAL");
anAssert.assertAll();
}
/**
* 1 src tbl 1 dst tbl. Change table properties and comment at the source.
* Changes should get reflected at destination.
*/
@Test
public void drChangeCommentAndPropertyTest() throws Exception {
final RecipeExecLocation recipeExecLocation = RecipeExecLocation.SourceCluster;
setUp(recipeExecLocation);
final String tblName = "myTable";
recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
final List<String> command = recipeMerlin.getSubmissionCommand();
runSql(connection, "create table " + tblName + "(field string)");
//add new table property
runSql(connection,
"ALTER TABLE " + tblName + " SET TBLPROPERTIES('someProperty' = 'initialValue')");
//set comment
runSql(connection,
"ALTER TABLE " + tblName + " SET TBLPROPERTIES('comment' = 'this comment will be "
+ "changed, SHOULD NOT appear')");
LOGGER.info(tblName + " before bootstrap copy: ");
runSql(connection, "describe extended " + tblName);
bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
//change table property and comment
runSql(connection,
"ALTER TABLE " + tblName + " SET TBLPROPERTIES('someProperty' = 'anotherValue')");
runSql(connection,
"ALTER TABLE " + tblName + " SET TBLPROPERTIES('comment' = 'this comment should "
+ "appear after replication done')");
LOGGER.info(tblName + " after modifications, before replication: ");
runSql(connection, "describe extended " + tblName);
Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
InstanceUtil.waitTillInstanceReachState(recipeExecLocation.getRecipeOC(clusterOC, clusterOC2),
recipeMerlin.getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true)
).assertAll();
}
@Test
public void dataGeneration() throws Exception {
setUp(RecipeExecLocation.SourceCluster);
runSql(connection, "use hdr_sdb1");
createVanillaTable(connection, "store_sales");
createSerDeTable(connection);
createPartitionedTable(connection);
createExternalTable(connection, clusterFS,
baseTestHDFSDir + "click_data/", "click_data");
createExternalPartitionedTable(connection, clusterFS,
baseTestHDFSDir + "click_data2/", "click_data2");
runSql(connection2, "use hdr_sdb1");
createVanillaTable(connection2, "store_sales");
createSerDeTable(connection2);
createPartitionedTable(connection2);
createExternalTable(connection2, clusterFS2,
baseTestHDFSDir + "click_data/", "click_data");
createExternalPartitionedTable(connection2, clusterFS2,
baseTestHDFSDir + "click_data2/", "click_data2");
final NotifyingAssert anAssert = new NotifyingAssert(true);
HiveAssert.assertDbEqual(cluster, clusterHC.getDatabase("hdr_sdb1"),
cluster2, clusterHC2.getDatabase("hdr_sdb1"), anAssert);
HiveAssert.assertTableEqual(cluster, clusterHC.getTable("hdr_sdb1", "click_data"),
cluster2, clusterHC2.getTable("hdr_sdb1", "click_data"), anAssert);
anAssert.assertAll();
}
@Test(enabled = false)
public void assertionTest() throws Exception {
setUp(RecipeExecLocation.SourceCluster);
final SoftAssert anAssert = new SoftAssert();
HiveAssert.assertTableEqual(
cluster, clusterHC.getTable("default", "hcatsmoke10546"),
cluster2, clusterHC2.getTable("default", "hcatsmoke10548"), anAssert);
HiveAssert.assertDbEqual(cluster, clusterHC.getDatabase("default"), cluster2,
clusterHC2.getDatabase("default"), anAssert);
anAssert.assertAll();
}
/**
* Test creates a table on first cluster using static partitioning. Then it creates the same
* table on the second cluster using dynamic partitioning. Finally it checks the equality of
* these tables.
* @throws SQLException
* @throws IOException
*/
@Test
public void dynamicPartitionsTest() throws Exception {
setUp(RecipeExecLocation.SourceCluster);
//create table with static partitions on first cluster
createPartitionedTable(connection, false);
//create table with dynamic partitions on second cluster
createPartitionedTable(connection2, true);
//check that both tables are equal
HiveAssert.assertTableEqual(
cluster, clusterHC.getTable("hdr_sdb1", "global_store_sales"),
cluster2, clusterHC2.getTable("hdr_sdb1", "global_store_sales"), new SoftAssert()
).assertAll();
}
/**
* 1 src tbl 1 dst tbl replication. Insert/delete/replace partitions using dynamic partition
* queries. The changes should get reflected at destination.
*/
@Test
public void drInsertDropReplaceDynamicPartition() throws Exception {
final RecipeExecLocation recipeExecLocation = RecipeExecLocation.SourceCluster;
setUp(recipeExecLocation);
final String tblName = "dynamicPartitionDR";
recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
final List<String> command = recipeMerlin.getSubmissionCommand();
//disable strict mode to use only dynamic partition
runSql(connection, "set hive.exec.dynamic.partition.mode=nonstrict");
runSql(connection,
"create table " + tblName + "(comment string) partitioned by (pname string)");
runSql(connection,
"insert into table " + tblName + " partition (pname) values"
+ "('this partition is going to be deleted - should NOT appear after dr', 'DELETE')");
runSql(connection,
"insert into table " + tblName + " partition (pname) values"
+ "('this partition is going to be replaced - should NOT appear after dr', 'REPLACE')");
runSql(connection,
"insert into table " + tblName + " partition (pname) values"
+ "('this partition will have more data - should appear after dr', 'ADD_DATA')");
LOGGER.info(tblName + " before bootstrap copying: ");
runSql(connection, "select * from " + tblName);
bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
runSql(connection,
"insert into table " + tblName + " partition (pname) values"
+ "('this partition has been added post bootstrap - should appear after dr', 'NEW_PART')");
runSql(connection,
"insert into table " + tblName + " partition (pname) values"
+ "('more data has been added post bootstrap - should appear after dr', 'ADD_DATA')");
runSql(connection,
"alter table " + tblName + " drop partition(pname = 'DELETE')");
runSql(connection,
"alter table " + tblName + " drop partition(pname = 'REPLACE')");
runSql(connection,
"insert into table " + tblName + " partition (pname) values"
+ "('this partition has been replaced - should appear after dr', 'REPLACE')");
LOGGER.info(tblName + " after modifications, before replication: ");
runSql(connection, "select * from " + tblName);
Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
InstanceUtil.waitTillInstanceReachState(recipeExecLocation.getRecipeOC(clusterOC, clusterOC2),
recipeMerlin.getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true)
).assertAll();
}
/**
* 1 src tbl 1 dst tbl replication. Insert/overwrite partitions using dynamic partitions
* queries. The changes should get reflected at destination.
* @throws Exception
*/
@Test
public void drInsertOverwriteDynamicPartition() throws Exception {
final RecipeExecLocation recipeExecLocation = RecipeExecLocation.SourceCluster;
setUp(recipeExecLocation);
final String tblName = "drInsertOverwritePartition";
final String hlpTblName = "drInsertOverwritePartitionHelperTbl";
recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
final List<String> command = recipeMerlin.getSubmissionCommand();
//disable strict mode to use only dynamic partition
runSql(connection, "set hive.exec.dynamic.partition.mode=nonstrict");
runSql(connection,
"create table " + hlpTblName + "(comment string) partitioned by (pname string)");
runSql(connection,
"insert into table " + hlpTblName + " partition (pname)"
+ " values('overwrite data - should appear after dr', 'OVERWRITE_PART')");
runSql(connection,
"insert into table " + hlpTblName + " partition (pname)"
+ " values('newdata row2 - should appear after dr', 'NEW_DATA')");
runSql(connection,
"insert into table " + hlpTblName + " partition (pname)"
+ " values('newdata row1 - should appear after dr', 'NEW_DATA')");
runSql(connection,
"create table " + tblName + "(comment string) partitioned by (pname string)");
runSql(connection,
"insert into table " + tblName + " partition (pname) values"
+ "('this data should be retained - should appear after dr', 'OLD_PART')");
runSql(connection,
"insert into table " + tblName + " partition (pname) values"
+ "('this data should get overwritten - should NOT appear after dr', 'OVERWRITE_PART')");
LOGGER.info(tblName + " before bootstrap copying: ");
runSql(connection, "select * from " + tblName);
bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
runSql(connection,
"insert overwrite table " + tblName + " partition (pname) "
+ "select comment, pname from " + hlpTblName + " where comment REGEXP '^overwrite'");
runSql(connection,
"insert overwrite table " + tblName + " partition (pname) "
+ "select comment, pname from " + hlpTblName + " where comment REGEXP '^newdata'");
LOGGER.info(tblName + " after modifications, before replication: ");
runSql(connection, "select * from " + tblName);
Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
InstanceUtil.waitTillInstanceReachState(recipeExecLocation.getRecipeOC(clusterOC, clusterOC2),
recipeMerlin.getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true)
).assertAll();
}
/**
* Run recipe with different frequencies. Submission should go through.
* Check frequency of the launched oozie job
*/
@Test(dataProvider = "frequencyGenerator")
public void differentRecipeFrequenciesTest(String frequency) throws Exception {
setUp(RecipeExecLocation.SourceCluster);
LOGGER.info("Testing with frequency: " + frequency);
String tblName = "myTable";
recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName)
.withFrequency(new Frequency(frequency));
runSql(connection, "create table " + tblName + "(comment string)");
final List<String> command = recipeMerlin.getSubmissionCommand();
Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
LOGGER.info("Submission went through.");
InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
String filter = "name=FALCON_PROCESS_" + recipeMerlin.getName();
List<BundleJob> bundleJobs = OozieUtil.getBundles(clusterOC, filter, 0, 10);
List<String> bundleIds = OozieUtil.getBundleIds(bundleJobs);
String bundleId = OozieUtil.getMaxId(bundleIds);
List<CoordinatorJob> coords = clusterOC.getBundleJobInfo(bundleId).getCoordinators();
List<String> cIds = new ArrayList<String>();
for (CoordinatorJob coord : coords) {
cIds.add(coord.getId());
}
String coordId = OozieUtil.getMinId(cIds);
CoordinatorJob job = clusterOC.getCoordJobInfo(coordId);
CoordinatorJob.Timeunit timeUnit = job.getTimeUnit();
String freq = job.getFrequency();
LOGGER.info("Frequency of running job: " + timeUnit + " " + freq);
String unit = timeUnit.name().toLowerCase().replace("_", "");
if (frequency.contains("hours")) {
unit = "hours";
freq = String.valueOf(Integer.parseInt(freq) / 60);
}
Assert.assertTrue(frequency.contains(unit)
&& frequency.contains(freq), "Running job has different frequency.");
}
@DataProvider(name = "frequencyGenerator")
public Object[][] frequencyGenerator() {
return new Object[][]{{"minutes(10)"}, {"minutes(10000)"}, {"hours(5)"}, {"hours(5000)"},
{"days(3)"}, {"days(3000)"}, {"months(1)"}, {"months(1000)"}, };
}
@AfterMethod(alwaysRun = true)
public void tearDown() throws IOException {
try {
prism.getProcessHelper().deleteByName(recipeMerlin.getName(), null);
} catch (Exception e) {
LOGGER.info("Deletion of process: " + recipeMerlin.getName() + " failed with exception: " + e);
}
removeTestClassEntities();
cleanTestsDirs();
}
}