blob: e281beef8062a8d0659bfcaf24d8b7c7ffea0c8d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.regression.hive.dr;
import org.apache.falcon.cli.FalconCLI;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.regression.Entities.ClusterMerlin;
import org.apache.falcon.regression.Entities.RecipeMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.supportClasses.NotifyingAssert;
import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.Config;
import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.HiveAssert;
import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.MatrixUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hive.hcatalog.api.HCatClient;
import org.apache.log4j.Logger;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.OozieClient;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import static org.apache.falcon.regression.core.util.HiveUtil.runSql;
import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.bootstrapCopy;
import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createVanillaTable;
/**
* Hive DR Testing for Hive database replication.
*/
@Test(groups = {"embedded", "multiCluster"})
public class HiveDbDRTest extends BaseTestClass {
private static final Logger LOGGER = Logger.getLogger(HiveDbDRTest.class);
private final ColoHelper cluster = servers.get(0);
private final ColoHelper cluster2 = servers.get(1);
private final FileSystem clusterFS = serverFS.get(0);
private final FileSystem clusterFS2 = serverFS.get(1);
private final OozieClient clusterOC = serverOC.get(0);
private final OozieClient clusterOC2 = serverOC.get(1);
private HCatClient clusterHC;
private HCatClient clusterHC2;
private RecipeMerlin recipeMerlin;
private Connection connection;
private Connection connection2;
@DataProvider
public Object[][] getRecipeLocation() {
return MatrixUtil.crossProduct(RecipeExecLocation.values());
}
private void setUp(RecipeExecLocation recipeExecLocation) throws Exception {
clusterHC = cluster.getClusterHelper().getHCatClient();
clusterHC2 = cluster2.getClusterHelper().getHCatClient();
bundles[0] = new Bundle(BundleUtil.readHCatBundle(), cluster);
bundles[1] = new Bundle(BundleUtil.readHCatBundle(), cluster2);
bundles[0].generateUniqueBundle(this);
bundles[1].generateUniqueBundle(this);
final ClusterMerlin srcCluster = bundles[0].getClusterElement();
final ClusterMerlin tgtCluster = bundles[1].getClusterElement();
Bundle.submitCluster(recipeExecLocation.getRecipeBundle(bundles[0], bundles[1]));
String recipeDir = "HiveDrRecipe";
if (MerlinConstants.IS_SECURE) {
recipeDir = "HiveDrSecureRecipe";
}
recipeMerlin = RecipeMerlin.readFromDir(recipeDir, FalconCLI.RecipeOperation.HIVE_DISASTER_RECOVERY)
.withRecipeCluster(recipeExecLocation.getRecipeCluster(srcCluster, tgtCluster));
recipeMerlin.withSourceCluster(srcCluster)
.withTargetCluster(tgtCluster)
.withFrequency(new Frequency("5", Frequency.TimeUnit.minutes))
.withValidity(TimeUtil.getTimeWrtSystemTime(-1), TimeUtil.getTimeWrtSystemTime(11));
recipeMerlin.setUniqueName(this.getClass().getSimpleName());
connection = cluster.getClusterHelper().getHiveJdbcConnection();
connection2 = cluster2.getClusterHelper().getHiveJdbcConnection();
}
private void setUpDb(String dbName, Connection conn) throws SQLException {
runSql(conn, "drop database if exists " + dbName + " cascade");
runSql(conn, "create database " + dbName);
runSql(conn, "use " + dbName);
}
@Test(dataProvider = "getRecipeLocation")
public void drDbDropDb(final RecipeExecLocation recipeExecLocation) throws Exception {
setUp(recipeExecLocation);
final String dbName = "drDbDropDb";
setUpDb(dbName, connection);
setUpDb(dbName, connection2);
recipeMerlin.withSourceDb(dbName).withSourceTable("*");
final List<String> command = recipeMerlin.getSubmissionCommand();
Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
runSql(connection, "drop database " + dbName);
InstanceUtil.waitTillInstanceReachState(recipeExecLocation.getRecipeOC(clusterOC, clusterOC2),
recipeMerlin.getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
final List<String> dstDbs = runSql(connection2, "show databases");
Assert.assertFalse(dstDbs.contains(dbName), "dstDbs = " + dstDbs + " was not expected to "
+ "contain " + dbName);
}
@Test(dataProvider = "isDBReplication")
public void drDbFailPass(Boolean isDBReplication) throws Exception {
final RecipeExecLocation recipeExecLocation = RecipeExecLocation.SourceCluster;
setUp(recipeExecLocation);
final String dbName = "drDbFailPass";
final String tblName = "vanillaTable";
final String hiveWarehouseLocation = Config.getProperty("hive.warehouse.location", "/apps/hive/warehouse/");
final String dbPath = HadoopUtil.joinPath(hiveWarehouseLocation, dbName.toLowerCase() + ".db");
setUpDb(dbName, connection);
runSql(connection, "create table " + tblName + "(data string)");
setUpDb(dbName, connection2);
bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
recipeMerlin.withSourceDb(dbName).withSourceTable(isDBReplication ? "*" : tblName);
final List<String> command = recipeMerlin.getSubmissionCommand();
Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
runSql(connection, "insert into table " + tblName + " values('cannot be replicated now')");
final String noReadWritePerm = "d---r-xr-x";
LOGGER.info("Setting " + clusterFS2.getUri() + dbPath + " to : " + noReadWritePerm);
clusterFS2.setPermission(new Path(dbPath), FsPermission.valueOf(noReadWritePerm));
InstanceUtil.waitTillInstanceReachState(recipeExecLocation.getRecipeOC(clusterOC, clusterOC2),
recipeMerlin.getName(), 1, CoordinatorAction.Status.KILLED, EntityType.PROCESS);
final String readWritePerm = "drwxr-xr-x";
LOGGER.info("Setting " + clusterFS2.getUri() + dbPath + " to : " + readWritePerm);
clusterFS2.setPermission(new Path(dbPath), FsPermission.valueOf(readWritePerm));
InstanceUtil.waitTillInstanceReachState(recipeExecLocation.getRecipeOC(clusterOC, clusterOC2),
recipeMerlin.getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
HiveAssert.assertTableEqual(cluster, clusterHC.getTable(dbName, tblName),
cluster2, clusterHC2.getTable(dbName, tblName), new NotifyingAssert(true)
).assertAll();
}
@Test
public void drDbAddDropTable() throws Exception {
final RecipeExecLocation recipeExecLocation = RecipeExecLocation.SourceCluster;
setUp(recipeExecLocation);
final String dbName = "drDbAddDropTable";
final String tblToBeDropped = "table_to_be_dropped";
final String tblToBeDroppedAndAdded = "table_to_be_dropped_and_readded";
final String newTableToBeAdded = "new_table_to_be_added";
setUpDb(dbName, connection);
setUpDb(dbName, connection2);
recipeMerlin.withSourceDb(dbName).withSourceTable("*")
.withFrequency(new Frequency("2", Frequency.TimeUnit.minutes));
final List<String> command = recipeMerlin.getSubmissionCommand();
createVanillaTable(connection, tblToBeDropped);
createVanillaTable(connection, tblToBeDroppedAndAdded);
bootstrapCopy(connection, clusterFS, tblToBeDropped,
connection2, clusterFS2, tblToBeDropped);
bootstrapCopy(connection, clusterFS, tblToBeDroppedAndAdded,
connection2, clusterFS2, tblToBeDroppedAndAdded);
/* For first replication - two tables are dropped & one table is added */
runSql(connection, "drop table " + tblToBeDropped);
runSql(connection, "drop table " + tblToBeDroppedAndAdded);
createVanillaTable(connection, newTableToBeAdded);
Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
InstanceUtil.waitTillInstanceReachState(recipeExecLocation.getRecipeOC(clusterOC, clusterOC2),
recipeMerlin.getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
final NotifyingAssert anAssert = new NotifyingAssert(true);
HiveAssert.assertDbEqual(cluster, clusterHC.getDatabase(dbName),
cluster2, clusterHC2.getDatabase(dbName), anAssert);
/* For second replication - a dropped tables is added back */
createVanillaTable(connection, tblToBeDroppedAndAdded);
InstanceUtil.waitTillInstanceReachState(recipeExecLocation.getRecipeOC(clusterOC, clusterOC2),
recipeMerlin.getName(), 2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
HiveAssert.assertDbEqual(cluster, clusterHC.getDatabase(dbName),
cluster2, clusterHC2.getDatabase(dbName), anAssert);
anAssert.assertAll();
}
@Test
public void drDbNonReplicatableTable() throws Exception {
final RecipeExecLocation recipeExecLocation = RecipeExecLocation.SourceCluster;
setUp(recipeExecLocation);
final String dbName = "drDbNonReplicatableTable";
final String tblName = "vanillaTable";
final String tblView = "vanillaTableView";
final String tblOffline = "offlineTable";
setUpDb(dbName, connection);
setUpDb(dbName, connection2);
recipeMerlin.withSourceDb(dbName).withSourceTable("*")
.withFrequency(new Frequency("2", Frequency.TimeUnit.minutes));
final List<String> command = recipeMerlin.getSubmissionCommand();
createVanillaTable(connection, tblName);
runSql(connection, "create view " + tblView + " as select * from " + tblName);
createVanillaTable(connection, tblOffline);
bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
bootstrapCopy(connection, clusterFS, tblOffline, connection2, clusterFS2, tblOffline);
final String newComment = "'new comment for offline table should not reach destination'";
runSql(connection,
"alter table " + tblOffline + " set tblproperties ('comment' =" + newComment +")");
runSql(connection, "alter table " + tblOffline + " enable offline");
Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
InstanceUtil.waitTillInstanceReachState(recipeExecLocation.getRecipeOC(clusterOC, clusterOC2),
recipeMerlin.getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
//vanilla table gets replicated, offline table & view are not replicated
HiveAssert.assertTableEqual(cluster, clusterHC.getTable(dbName, tblName),
cluster2, clusterHC2.getTable(dbName, tblName), new NotifyingAssert(true)).assertAll();
final List<String> dstTables = runSql(connection2, "show tables");
Assert.assertFalse(dstTables.contains(tblView),
"dstTables = " + dstTables + " was not expected to contain " + tblView);
final List<String> dstComment =
runSql(connection2, "show tblproperties " + tblOffline + "('comment')");
Assert.assertFalse(dstComment.contains(newComment),
tblOffline + " comment = " + dstComment + " was not expected to contain " + newComment);
}
@AfterMethod(alwaysRun = true)
public void tearDown() throws IOException {
try {
prism.getProcessHelper().deleteByName(recipeMerlin.getName(), null);
} catch (Exception e) {
LOGGER.info("Deletion of process: " + recipeMerlin.getName() + " failed with exception: " + e);
}
removeTestClassEntities();
cleanTestsDirs();
}
@DataProvider
public Object[][] isDBReplication() {
return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}};
}
}