blob: db020ec04aeef935f91d42f36873b9c72306037e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.regression.hcat;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.feed.ActionType;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.regression.Entities.FeedMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.regression.core.enumsAndConstants.FreqType;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.response.ServiceResponse;
import org.apache.falcon.regression.core.util.AssertUtil;
import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.HCatUtil;
import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.OSUtil;
import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.hive.hcatalog.api.HCatClient;
import org.apache.hive.hcatalog.api.HCatCreateTableDesc;
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Tests with operations with hcat feed.
*/
@Test(groups = "embedded")
public class HCatFeedOperationsTest extends BaseTestClass {
private ColoHelper cluster = servers.get(0);
private OozieClient clusterOC = serverOC.get(0);
private HCatClient clusterHC;
private ColoHelper cluster2 = servers.get(1);
private OozieClient cluster2OC = serverOC.get(1);
private HCatClient cluster2HC;
private String dbName = "default";
private String tableName = "hcatFeedOperationsTest";
private String randomTblName = "randomTable_HcatFeedOperationsTest";
private String feed;
private String aggregateWorkflowDir = cleanAndGetTestDir() + "/aggregator";
@BeforeClass(alwaysRun = true)
public void createTestData() throws Exception {
uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
clusterHC = cluster.getClusterHelper().getHCatClient();
cluster2HC = cluster2.getClusterHelper().getHCatClient();
//create an empty table for feed operations
ArrayList<HCatFieldSchema> partitions = new ArrayList<>();
partitions.add(HCatUtil.getStringSchema("year", "yearPartition"));
createEmptyTable(clusterHC, dbName, tableName, partitions);
//A random table to test submission of replication feed when table doesn't exist on target
createEmptyTable(clusterHC, dbName, randomTblName, partitions);
//create empty table on target cluster
createEmptyTable(cluster2HC, dbName, tableName, new ArrayList<HCatFieldSchema>());
}
@BeforeMethod(alwaysRun = true)
public void setUp() throws Exception {
Bundle bundle = BundleUtil.readHCatBundle();
bundles[0] = new Bundle(bundle, cluster.getPrefix());
bundles[0].generateUniqueBundle(this);
bundles[0].setClusterInterface(Interfacetype.REGISTRY, cluster.getClusterHelper().getHCatEndpoint());
bundles[1] = new Bundle(bundle, cluster2.getPrefix());
bundles[1].generateUniqueBundle(this);
bundles[1].setClusterInterface(Interfacetype.REGISTRY, cluster2.getClusterHelper().getHCatEndpoint());
}
@AfterMethod(alwaysRun = true)
public void tearDown() throws HCatException {
removeTestClassEntities();
}
@AfterClass(alwaysRun = true)
public void tearDownClass() throws IOException {
clusterHC.dropTable(dbName, tableName, true);
clusterHC.dropTable(dbName, randomTblName, true);
cluster2HC.dropTable(dbName, tableName, true);
}
/**
* Submit Hcat feed when Hcat table mentioned in table uri does not exist. Response should reflect failure.
*
* @throws Exception
*/
@Test(groups = {"multiCluster"})
public void submitFeedWhenTableDoesNotExist() throws Exception {
Bundle.submitCluster(bundles[1]);
feed = bundles[1].getInputFeedFromBundle();
FeedMerlin feedObj = new FeedMerlin(feed);
feedObj.setTableValue(dbName, randomTblName, FreqType.YEARLY.getHcatPathValue());
ServiceResponse response = prism.getFeedHelper().submitEntity(feedObj.toString());
AssertUtil.assertFailed(response);
}
/**
* Submit Hcat feed when Hcat table mentioned in table uri exists. Delete that feed, and re-submit.
* All responses should reflect success.
*
* @throws Exception
*/
@Test(groups = {"singleCluster"})
public void submitFeedPostDeletionWhenTableExists() throws Exception {
Bundle.submitCluster(bundles[0]);
feed = bundles[0].getInputFeedFromBundle();
FeedMerlin feedObj = new FeedMerlin(feed);
feedObj.setTableValue(dbName, tableName, FreqType.YEARLY.getHcatPathValue());
ServiceResponse response = prism.getFeedHelper().submitEntity(feedObj.toString());
AssertUtil.assertSucceeded(response);
response = prism.getFeedHelper().delete(feedObj.toString());
AssertUtil.assertSucceeded(response);
response = prism.getFeedHelper().submitEntity(feedObj.toString());
AssertUtil.assertSucceeded(response);
}
/**
* Submit Hcat Replication feed when Hcat table mentioned in table uri exists on both source and target.
* The response is Psucceeded, and a replication co-rdinator should apear on target oozie.
* The test however does not ensure that
* replication goes through.
*
* @throws Exception
*/
@Test(groups = {"multiCluster"})
public void submitAndScheduleReplicationFeedWhenTableExistsOnSourceAndTarget() throws Exception {
Bundle.submitCluster(bundles[0], bundles[1]);
final String startDate = "2010-01-01T20:00Z";
final String endDate = "2099-01-01T00:00Z";
String tableUri = "catalog:" + dbName + ":" + tableName + "#year=${YEAR}";
bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
bundles[0].setInputFeedValidity(startDate, endDate);
bundles[0].setInputFeedTableUri(tableUri);
feed = bundles[0].getDataSets().get(0);
// set cluster 2 as the target.
feed = FeedMerlin.fromString(feed).addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
.withRetention("months(9000)", ActionType.DELETE)
.withValidity(startDate, endDate)
.withClusterType(ClusterType.TARGET)
.withTableUri(tableUri)
.build()).toString();
AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed, 0);
Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, Util.readEntityName(feed), "REPLICATION"), 1);
//This test doesn't wait for replication to succeed.
}
/**
* Submit Hcat Replication feed. Suspend the feed, and check that feed was suspended on
* both clusters. Now resume feed, and check that status is running on both clusters.
* The test however does not ensure that replication goes through.
*
* @throws Exception
*/
@Test(groups = {"multiCluster"})
public void suspendAndResumeReplicationFeed() throws Exception {
submitAndScheduleReplicationFeedWhenTableExistsOnSourceAndTarget();
AssertUtil.assertSucceeded(prism.getFeedHelper().suspend(feed));
//check that feed suspended on both clusters
AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED);
AssertUtil.checkStatus(cluster2OC, EntityType.FEED, feed, Job.Status.SUSPENDED);
AssertUtil.assertSucceeded(prism.getFeedHelper().resume(feed));
AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
AssertUtil.checkStatus(cluster2OC, EntityType.FEED, feed, Job.Status.RUNNING);
}
/**
* Submit Hcat Replication feed. Delete the feed, and check that feed was deleted on
* both clusters. The test however does not ensure that replication goes through.
*
* @throws Exception
*/
@Test(groups = {"multiCluster"})
public void deleteReplicationFeed() throws Exception {
submitAndScheduleReplicationFeedWhenTableExistsOnSourceAndTarget();
AssertUtil.assertSucceeded(prism.getFeedHelper().delete(feed));
AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.KILLED);
AssertUtil.checkStatus(cluster2OC, EntityType.FEED, feed, Job.Status.KILLED);
}
private static void createEmptyTable(HCatClient cli, String dbName, String tabName,
List<HCatFieldSchema> partitionCols) throws HCatException{
ArrayList<HCatFieldSchema> cols = new ArrayList<>();
cols.add(HCatUtil.getStringSchema("id", "id comment"));
HCatCreateTableDesc tableDesc = HCatCreateTableDesc
.create(dbName, tabName, cols)
.partCols(partitionCols)
.fileFormat("textfile")
.ifNotExists(true)
.isTableExternal(true)
.build();
cli.createTable(tableDesc);
}
}