| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.falcon.regression.hcat; |
| |
| import org.apache.falcon.entity.v0.EntityType; |
| import org.apache.falcon.entity.v0.Frequency; |
| import org.apache.falcon.entity.v0.cluster.Interfacetype; |
| import org.apache.falcon.entity.v0.feed.ActionType; |
| import org.apache.falcon.entity.v0.feed.ClusterType; |
| import org.apache.falcon.regression.Entities.FeedMerlin; |
| import org.apache.falcon.regression.core.bundle.Bundle; |
| import org.apache.falcon.regression.core.enumsAndConstants.FeedType; |
| import org.apache.falcon.regression.core.helpers.ColoHelper; |
| import org.apache.falcon.regression.core.response.ServiceResponse; |
| import org.apache.falcon.regression.core.util.AssertUtil; |
| import org.apache.falcon.regression.core.util.BundleUtil; |
| import org.apache.falcon.regression.core.util.HCatUtil; |
| import org.apache.falcon.regression.core.util.OSUtil; |
| import org.apache.falcon.regression.core.util.Util; |
| import org.apache.falcon.regression.core.util.InstanceUtil; |
| import org.apache.falcon.regression.core.util.XmlUtil; |
| import org.apache.falcon.regression.testHelper.BaseTestClass; |
| import org.apache.hive.hcatalog.api.HCatClient; |
| import org.apache.hive.hcatalog.api.HCatCreateTableDesc; |
| import org.apache.hive.hcatalog.common.HCatException; |
| import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; |
| import org.apache.log4j.Logger; |
| import org.apache.oozie.client.Job; |
| import org.apache.oozie.client.OozieClient; |
| import org.testng.Assert; |
| import org.testng.annotations.BeforeClass; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.AfterClass; |
| import org.testng.annotations.Test; |
| |
| import java.io.IOException; |
| import java.lang.reflect.Method; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| public class HCatFeedOperationsTest extends BaseTestClass { |
| |
| ColoHelper cluster = servers.get(0); |
| OozieClient clusterOC = serverOC.get(0); |
| HCatClient clusterHC = cluster.getClusterHelper().getHCatClient(); |
| |
| ColoHelper cluster2 = servers.get(1); |
| OozieClient cluster2OC = serverOC.get(1); |
| HCatClient cluster2HC = cluster2.getClusterHelper().getHCatClient(); |
| |
| private String dbName = "default"; |
| private String tableName = "hcatFeedOperationsTest"; |
| private String randomTblName = "randomTable_HcatFeedOperationsTest"; |
| private String feed; |
| private String aggregateWorkflowDir = baseHDFSDir + "/HCatFeedOperationsTest/aggregator"; |
| private static final Logger LOGGER = Logger.getLogger(HCatFeedOperationsTest.class); |
| |
| public void uploadWorkflow() throws Exception { |
| uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); |
| } |
| |
| @BeforeClass(alwaysRun = true) |
| public void createTestData() throws Exception { |
| //create an empty table for feed operations |
| ArrayList<HCatFieldSchema> partitions = new ArrayList<HCatFieldSchema>(); |
| partitions.add(HCatUtil.getStringSchema("year", "yearPartition")); |
| createEmptyTable(clusterHC, dbName, tableName, partitions); |
| |
| //A random table to test submission of replication feed when table doesn't exist on target |
| createEmptyTable(clusterHC, dbName, randomTblName, partitions); |
| |
| //create empty table on target cluster |
| createEmptyTable(cluster2HC, dbName, tableName, new ArrayList<HCatFieldSchema>()); |
| } |
| |
| @BeforeMethod(alwaysRun = true) |
| public void setUp(Method method) throws Exception { |
| LOGGER.info("test name: " + method.getName()); |
| Bundle bundle = BundleUtil.readHCatBundle(); |
| bundles[0] = new Bundle(bundle, cluster.getPrefix()); |
| bundles[0].generateUniqueBundle(); |
| bundles[0].setClusterInterface(Interfacetype.REGISTRY, cluster.getClusterHelper().getHCatEndpoint()); |
| |
| |
| bundles[1] = new Bundle(bundle, cluster2.getPrefix()); |
| bundles[1].generateUniqueBundle(); |
| bundles[1].setClusterInterface(Interfacetype.REGISTRY, cluster2.getClusterHelper().getHCatEndpoint()); |
| } |
| |
| @AfterMethod(alwaysRun = true) |
| public void tearDown() throws HCatException { |
| removeBundles(); |
| } |
| |
| @AfterClass(alwaysRun = true) |
| public void tearDownClass() throws IOException { |
| clusterHC.dropTable(dbName, tableName, true); |
| clusterHC.dropTable(dbName, randomTblName, true); |
| cluster2HC.dropTable(dbName, tableName, true); |
| cleanTestDirs(); |
| } |
| |
| /** |
| * Submit Hcat feed when Hcat table mentioned in table uri does not exist. Response should reflect failure. |
| * |
| * @throws Exception |
| */ |
| @Test(groups = {"singleCluster"}) |
| public void submitFeedWhenTableDoesNotExist() throws Exception { |
| Bundle.submitCluster(bundles[1]); |
| feed = bundles[1].getInputFeedFromBundle(); |
| FeedMerlin feedObj = new FeedMerlin(feed); |
| feedObj.setTableValue(dbName, randomTblName, FeedType.YEARLY.getHcatPathValue()); |
| ServiceResponse response = prism.getFeedHelper().submitEntity(Util.URLS.SUBMIT_URL, feedObj.toString()); |
| AssertUtil.assertFailed(response); |
| } |
| |
| /** |
| * Submit Hcat feed when Hcat table mentioned in table uri exists. Delete that feed, and re-submit. |
| * All responses should reflect success. |
| * |
| * @throws Exception |
| */ |
| @Test(groups = {"singleCluster"}) |
| public void submitFeedPostDeletionWhenTableExists() throws Exception { |
| Bundle.submitCluster(bundles[0]); |
| feed = bundles[0].getInputFeedFromBundle(); |
| FeedMerlin feedObj = new FeedMerlin(feed); |
| feedObj.setTableValue(dbName, tableName, FeedType.YEARLY.getHcatPathValue()); |
| ServiceResponse response = prism.getFeedHelper().submitEntity(Util.URLS.SUBMIT_URL, feedObj.toString()); |
| AssertUtil.assertSucceeded(response); |
| |
| response = prism.getFeedHelper().delete(Util.URLS.DELETE_URL, feedObj.toString()); |
| AssertUtil.assertSucceeded(response); |
| |
| response = prism.getFeedHelper().submitEntity(Util.URLS.SUBMIT_URL, feedObj.toString()); |
| AssertUtil.assertSucceeded(response); |
| } |
| |
| /** |
| * Submit Hcat Replication feed when Hcat table mentioned in table uri does not exist on target. The response is |
| * Partial, with successful with submit/schedule on source. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void submitAndScheduleReplicationFeedWhenTableDoesNotExistOnTarget() throws Exception { |
| Bundle.submitCluster(bundles[0], bundles[1]); |
| final String startDate = "2010-01-01T20:00Z"; |
| final String endDate = "2099-01-01T00:00Z"; |
| String tableUri = "catalog:" + dbName + ":" + randomTblName + "#year=${YEAR}"; |
| bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours); |
| bundles[0].setInputFeedValidity(startDate, endDate); |
| bundles[0].setInputFeedTableUri(tableUri); |
| |
| feed = bundles[0].getDataSets().get(0); |
| // set cluster 2 as the target. |
| feed = InstanceUtil.setFeedClusterWithTable(feed, |
| XmlUtil.createValidity(startDate, endDate), |
| XmlUtil.createRtention("months(9000)", ActionType.DELETE), |
| Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null, |
| tableUri); |
| |
| AssertUtil.assertPartial( |
| prism.getFeedHelper().submitAndSchedule(Util.URLS.SUBMIT_AND_SCHEDULE_URL, |
| feed) |
| ); |
| } |
| |
| /** |
| * Submit Hcat Replication feed when Hcat table mentioned in table uri exists on both source and target. The response is |
| * Psucceeded, and a replication co-rdinator should apear on target oozie. The test however does not ensure that |
| * replication goes through. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void submitAndScheduleReplicationFeedWhenTableExistsOnSourceAndTarget() throws Exception { |
| Bundle.submitCluster(bundles[0], bundles[1]); |
| final String startDate = "2010-01-01T20:00Z"; |
| final String endDate = "2099-01-01T00:00Z"; |
| String tableUri = "catalog:" + dbName + ":" + tableName + "#year=${YEAR}"; |
| bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours); |
| bundles[0].setInputFeedValidity(startDate, endDate); |
| bundles[0].setInputFeedTableUri(tableUri); |
| |
| feed = bundles[0].getDataSets().get(0); |
| // set cluster 2 as the target. |
| feed = InstanceUtil.setFeedClusterWithTable(feed, |
| XmlUtil.createValidity(startDate, endDate), |
| XmlUtil.createRtention("months(9000)", ActionType.DELETE), |
| Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null, |
| tableUri); |
| |
| AssertUtil.assertSucceeded( |
| prism.getFeedHelper().submitAndSchedule(Util.URLS.SUBMIT_AND_SCHEDULE_URL, |
| feed) |
| ); |
| Assert.assertEquals(InstanceUtil |
| .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), |
| "REPLICATION"), 1); |
| //This test doesn't wait for replication to succeed. |
| } |
| |
| /** |
| * Submit Hcat Replication feed. Suspend the feed, and check that feed was suspended on |
| * both clusters. Now resume feed, and check that status is running on both clusters. |
| * The test however does not ensure that replication goes through. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void suspendAndResumeReplicationFeed() throws Exception { |
| |
| submitAndScheduleReplicationFeedWhenTableExistsOnSourceAndTarget(); |
| |
| AssertUtil.assertSucceeded( |
| prism.getFeedHelper().suspend(Util.URLS.SUSPEND_URL, |
| feed) |
| ); |
| |
| //check that feed suspended on both clusters |
| AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED); |
| AssertUtil.checkStatus(cluster2OC, EntityType.FEED, feed, Job.Status.SUSPENDED); |
| |
| AssertUtil.assertSucceeded( |
| prism.getFeedHelper().resume(Util.URLS.RESUME_URL, |
| feed) |
| ); |
| |
| AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING); |
| AssertUtil.checkStatus(cluster2OC, EntityType.FEED, feed, Job.Status.RUNNING); |
| } |
| |
| /** |
| * Submit Hcat Replication feed. Delete the feed, and check that feed was deleted on |
| * both clusters. The test however does not ensure that replication goes through. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void deleteReplicationFeed() throws Exception { |
| submitAndScheduleReplicationFeedWhenTableExistsOnSourceAndTarget(); |
| |
| AssertUtil.assertSucceeded( |
| prism.getFeedHelper().delete(Util.URLS.DELETE_URL, |
| feed) |
| ); |
| AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.KILLED); |
| AssertUtil.checkStatus(cluster2OC, EntityType.FEED, feed, Job.Status.KILLED); |
| } |
| |
| |
| public static void createEmptyTable(HCatClient cli, String dbName, String tabName, List<HCatFieldSchema> partitionCols) throws HCatException{ |
| |
| ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>(); |
| cols.add(HCatUtil.getStringSchema("id", "id comment")); |
| HCatCreateTableDesc tableDesc = HCatCreateTableDesc |
| .create(dbName, tabName, cols) |
| .partCols(partitionCols) |
| .fileFormat("textfile") |
| .ifNotExists(true) |
| .isTableExternal(true) |
| .build(); |
| cli.createTable(tableDesc); |
| } |
| } |