blob: ac91d59d9dbafcd502afea6986a92f1db49d72ae [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.regression.lineage;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.feed.ActionType;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.regression.Entities.FeedMerlin;
import org.apache.falcon.regression.Entities.ProcessMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.helpers.entity.AbstractEntityHelper;
import org.apache.falcon.regression.core.util.AssertUtil;
import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.OSUtil;
import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.EntitySummaryResult;
import org.apache.falcon.resource.InstancesResult;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.log4j.Logger;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.asserts.SoftAssert;
import javax.xml.bind.JAXBException;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Test bulk API for dashboard needs.
*/
@Test(groups = "embedded")
public class EntitySummaryTest extends BaseTestClass {
private static final Logger LOGGER = Logger.getLogger(EntitySummaryTest.class);
private ColoHelper cluster1 = servers.get(0);
private OozieClient cluster1OC = serverOC.get(0);
private OozieClient cluster2OC = serverOC.get(1);
private FileSystem cluster1FS = serverFS.get(0);
private String baseTestHDFSDir = cleanAndGetTestDir();
private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
private String sourcePath = baseTestHDFSDir + "/source";
private String feedDataLocation = baseTestHDFSDir + "/source" + MINUTE_DATE_PATTERN;
private String targetPath = baseTestHDFSDir + "/target";
private String targetDataLocation = targetPath + MINUTE_DATE_PATTERN;
private String startTime, endTime;
private SoftAssert softAssert;
@BeforeMethod(alwaysRun = true)
public void prepareData() throws IOException {
uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
Bundle bundle = BundleUtil.readELBundle();
for (int i = 0; i <= 1; i++) {
bundles[i] = new Bundle(bundle, servers.get(i));
bundles[i].generateUniqueBundle(this);
bundles[i].setProcessWorkflow(aggregateWorkflowDir);
bundles[i].setInputFeedDataPath(feedDataLocation);
}
startTime = TimeUtil.getTimeWrtSystemTime(-35);
endTime = TimeUtil.getTimeWrtSystemTime(5);
LOGGER.info("Time range is between : " + startTime + " and " + endTime);
softAssert = new SoftAssert();
}
@AfterMethod(alwaysRun = true)
public void tearDown() throws IOException {
cleanTestsDirs();
removeTestClassEntities();
}
/**
* Get status of 7 processes and 7 instances of each process. The call should give correct
* information, instance info must be recent.
*/
@Test
public void getProcessSummary() throws Exception {
//prepare process
bundles[0].setProcessValidity(startTime, endTime);
bundles[0].submitClusters(prism);
bundles[0].submitFeeds(prism);
String clusterName = Util.readEntityName(bundles[0].getClusters().get(0));
List<String> processes = scheduleEntityValidateWaitingInstances(cluster1OC,
bundles[0].getProcessData(), EntityType.PROCESS, clusterName);
//create data for processes to run and wait some time for instances to make progress
OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS, processes.get(0), 0);
InstanceUtil.waitTillInstanceReachState(cluster1OC, processes.get(0), 1,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
//compare summary and instance result for each of 7 processes
validateProgressingInstances(processes, EntityType.PROCESS, clusterName);
softAssert.assertAll();
}
/**
* Get status of 7 feeds and 7 instances of each feed the call should give correct information,
* instance info must be recent.
*/
@Test(groups = "multiCluster")
public void getFeedSummary() throws Exception {
//prepare feed template.
bundles[0].setInputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
bundles[0].setInputFeedDataPath(feedDataLocation);
String feed = bundles[0].getInputFeedFromBundle();
String cluster1Def = bundles[0].getClusters().get(0);
String cluster2Def = bundles[1].getClusters().get(0);
//erase all clusters from feed definition
feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
//set cluster1 as source
feed = FeedMerlin.fromString(feed).addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(cluster1Def))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity(startTime, endTime)
.withClusterType(ClusterType.SOURCE)
.build()).toString();
//set cluster2 as target
feed = FeedMerlin.fromString(feed).addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(cluster2Def))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity(startTime, endTime)
.withClusterType(ClusterType.TARGET)
.withDataLocation(targetDataLocation)
.build()).toString();
String clusterName = Util.readEntityName(cluster2Def);
//submit clusters
AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(cluster1Def));
AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(cluster2Def));
//submit and schedule 7 feeds, check that 7 waiting instances are present for each feed
List<String> feeds = scheduleEntityValidateWaitingInstances(cluster2OC, feed, EntityType.FEED, clusterName);
//create data for processes to run and wait some time for instances to make progress
List<String> folders = TimeUtil.getMinuteDatesOnEitherSide(TimeUtil.oozieDateToDate(
startTime), TimeUtil.oozieDateToDate(endTime), 1);
HadoopUtil.flattenAndPutDataInFolder(cluster1FS, OSUtil.NORMAL_INPUT, sourcePath, folders);
InstanceUtil.waitTillInstanceReachState(cluster2OC, feeds.get(0), 1,
CoordinatorAction.Status.RUNNING, EntityType.FEED);
//compare summary and instance result for each of 7 processes
validateProgressingInstances(feeds, EntityType.FEED, clusterName);
softAssert.assertAll();
}
/*
* Schedules 7 entities and checks that summary reflects info about the most recent 7
* instances of each of them.
*/
private List<String> scheduleEntityValidateWaitingInstances(OozieClient oozieClient, String entity,
EntityType entityType, String clusterName)
throws AuthenticationException, IOException, URISyntaxException, JAXBException,
OozieClientException, InterruptedException {
String entityName = Util.readEntityName(entity);
AbstractEntityHelper helper;
List<String> names = new ArrayList<>();
for (int i = 1; i <= 7; i++) {
String uniqueName = entityName + i;
names.add(uniqueName);
Entity entityMerlin;
if (entityType == EntityType.FEED) {
helper = prism.getFeedHelper();
entityMerlin = new FeedMerlin(entity);
((FeedMerlin) entityMerlin).setName(uniqueName);
} else {
helper = prism.getProcessHelper();
entityMerlin = new ProcessMerlin(entity);
((ProcessMerlin) entityMerlin).setName(uniqueName);
}
entity = entityMerlin.toString();
AssertUtil.assertSucceeded(helper.submitAndSchedule(entity));
InstanceUtil.waitTillInstancesAreCreated(oozieClient, entity, 0);
InstanceUtil.waitTillInstanceReachState(oozieClient,
uniqueName, 7, CoordinatorAction.Status.WAITING, entityType);
//check that summary shows recent (i) number of feeds and their instances
EntitySummaryResult summary = helper.getEntitySummary(clusterName, null)
.getEntitySummaryResult();
EntitySummaryResult.EntitySummary[] entitiesSummary = summary.getEntitySummaries();
softAssert.assertEquals(entitiesSummary.length, i, "Summary must contain info "
+ "about exact number of feeds.");
for (EntitySummaryResult.EntitySummary entitySummary : entitiesSummary) {
String status = entitySummary.getStatus();
softAssert.assertTrue(status.equals("RUNNING") || status.equals("SUBMITTED"),
"Unexpected entity status : " + status);
softAssert.assertTrue(names.contains(entitySummary.getName()),
"Unexpected entity name: " + entitySummary.getName());
softAssert.assertEquals(entitySummary.getInstances().length, 7,
"Unexpected number of instances.");
for (EntitySummaryResult.Instance instance : entitySummary.getInstances()) {
softAssert.assertEquals(instance.getStatus(),
EntitySummaryResult.WorkflowStatus.WAITING,
"Unexpected instance status.");
softAssert.assertEquals(instance.getCluster(), clusterName,
"Invalid cluster in summary.");
}
}
}
return names;
}
/*
* Retrieves the most resent info from instanceStatus and entitySummary API and compares them.
* Summary for each entity should contain 7 instances and that instances should be the same
* as in response of instancesResult request.
*/
private void validateProgressingInstances(List<String> names, EntityType entityType,
String clusterName)
throws AuthenticationException, IOException, URISyntaxException, InterruptedException {
InstancesResult r;
AbstractEntityHelper helper;
if (entityType == EntityType.FEED) {
helper = prism.getFeedHelper();
} else {
helper = prism.getProcessHelper();
}
for (String entityName : names) {
LOGGER.info("Working with " + entityType + " : " + entityName);
//get recent instances info by -getStatus API
r = helper.getProcessInstanceStatus(entityName, null);
InstancesResult.Instance[] instancesFromStatus = r.getInstances();
LOGGER.info("Instances from -getStatus API: " + Arrays.toString(instancesFromStatus));
//get recent instances info by -summary API
EntitySummaryResult.EntitySummary[] summaries = helper.getEntitySummary(clusterName, null)
.getEntitySummaryResult().getEntitySummaries();
EntitySummaryResult.EntitySummary summaryItem = null;
//get instances for specific process
for (EntitySummaryResult.EntitySummary item : summaries) {
if (item.getName().equals(entityName)) {
summaryItem = item;
break;
}
}
Assert.assertNotNull(summaryItem, "Appropriate summary not found for : " + entityName);
EntitySummaryResult.Instance[] instancesFromSummary = summaryItem.getInstances();
LOGGER.info("Instances from SummaryResult: " + Arrays.toString(instancesFromSummary));
softAssert.assertEquals(instancesFromSummary.length, 7, "7 instances should be present in "
+ "summary.");
for (EntitySummaryResult.Instance instanceFromSummary : instancesFromSummary) {
softAssert.assertTrue(containsSummaryInstance(instancesFromStatus, instanceFromSummary),
String.format("Instance {%s;%s;%s} is absent in list (or differs by its properties)\n%s",
instanceFromSummary.getInstance(), instanceFromSummary.getStatus(),
instanceFromSummary.getCluster(), Arrays.toString(instancesFromStatus)));
}
}
}
/*
* Checks if array of instances of InstancesResult contains particular instance
* from SummaryResult by common properties.
*/
private boolean containsSummaryInstance(InstancesResult.Instance[] instancesFromStatus,
EntitySummaryResult.Instance instanceFromSummary) {
for (InstancesResult.Instance instanceFromStatus : instancesFromStatus) {
if (instanceFromStatus.getInstance().equals(instanceFromSummary.getInstance())) {
String status1 = instanceFromStatus.getStatus().toString();
String status2 = instanceFromSummary.getStatus().toString();
if (!status1.equals(status2)) {
LOGGER.info(String.format("Statuses comparison failed : %s and %s", status1, status2));
return false;
}
if (!instanceFromStatus.getCluster().equals(instanceFromSummary.getCluster())) {
LOGGER.info(String.format("Clusters comparison failed : %s and %s",
instanceFromStatus.getCluster(), instanceFromSummary.getCluster()));
return false;
}
return true;
}
}
LOGGER.info("Instance " + instanceFromSummary.getInstance() + " not found in list.");
return false;
}
/**
* Test entitySummary optional params. Schedule few processes and get summary using
* different optional params. Check that responses match to expected.
*
* @throws URISyntaxException
* @throws IOException
* @throws AuthenticationException
* @throws JAXBException
*/
@Test
public void getSummaryFilterBy()
throws URISyntaxException, IOException, AuthenticationException, JAXBException,
InterruptedException {
//prepare process template
bundles[0].setProcessValidity(startTime, endTime);
bundles[0].submitClusters(prism);
bundles[0].submitFeeds(prism);
String clusterName = Util.readEntityName(bundles[0].getClusters().get(0));
String originName = bundles[0].getProcessName();
List<String> pNames = new ArrayList<>();
//schedule 3 processes with different pipelines. 1st and 3d processes have same tag value.
for (int i = 1; i <= 3; i++) {
String uniqueName = originName + "-" + i;
pNames.add(uniqueName);
String pipeline = "pipeline_" + i;
bundles[0].setProcessName(uniqueName);
bundles[0].setProcessPipeline(pipeline);
if (i == 2) {
bundles[0].setProcessTags("value=2");
} else {
bundles[0].setProcessTags("value=1");
}
String process = bundles[0].getProcessData();
AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(process));
}
//test filterBy name option. Use first process name. Expecting 1 summary be returned.
EntitySummaryResult.EntitySummary[] summaries = prism.getProcessHelper()
.getEntitySummary(clusterName, "filterBy=NAME:" + pNames.get(0))
.getEntitySummaryResult().getEntitySummaries();
softAssert.assertEquals(summaries.length, 1, "There should be single process filtered by name.");
softAssert.assertEquals(summaries[0].getName(), pNames.get(0), "Invalid process was returned.");
//suspend one process and test filterBy status option for both running and suspended processes.
bundles[0].setProcessName(pNames.get(0));
AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(bundles[0].getProcessData()));
summaries = prism.getProcessHelper()
.getEntitySummary(clusterName, "filterBy=STATUS:SUSPENDED")
.getEntitySummaryResult().getEntitySummaries();
softAssert.assertEquals(summaries.length, 1, "There should be single SUSPENDED process filtered.");
softAssert.assertEquals(summaries[0].getName(), pNames.get(0),
"Summary shows invalid suspended process.");
//lets use the same request for RUNNING processes, adding orderBy option and
//sortOrder=asc option, check result
summaries = prism.getProcessHelper()
.getEntitySummary(clusterName, "filterBy=STATUS:RUNNING&orderBy=name&sortOrder=asc")
.getEntitySummaryResult().getEntitySummaries();
softAssert.assertEquals(summaries.length, 2, "Invalid RUNNING processes number.");
softAssert.assertEquals(summaries[0].getName(), pNames.get(1), "Another process expected.");
softAssert.assertEquals(summaries[1].getName(), pNames.get(2), "Another process expected.");
//lets use the same request adding orderBy option and sortOrder=desc option, check result
summaries = prism.getProcessHelper()
.getEntitySummary(clusterName, "filterBy=STATUS:RUNNING&orderBy=name&sortOrder=desc")
.getEntitySummaryResult().getEntitySummaries();
softAssert.assertEquals(summaries.length, 2, "Invalid RUNNING processes number.");
softAssert.assertEquals(summaries[0].getName(), pNames.get(2), "Another process expected.");
softAssert.assertEquals(summaries[1].getName(), pNames.get(1), "Another process expected.");
//lets use original request adding numResults option. Single summary should be returned
summaries = prism.getProcessHelper()
.getEntitySummary(clusterName, "filterBy=STATUS:RUNNING&numResults=1")
.getEntitySummaryResult().getEntitySummaries();
softAssert.assertEquals(summaries.length, 1, "Single process should be shown.");
softAssert.assertEquals(summaries[0].getStatus(), "RUNNING", "Wrong status was returned.");
softAssert.assertEquals(summaries[0].getInstances().length, 7, "Invalid instances number.");
//use the same request adding numInstances option, check instances number differs
summaries = prism.getProcessHelper()
.getEntitySummary(clusterName, "filterBy=STATUS:RUNNING&numResults=1&numInstances=2")
.getEntitySummaryResult().getEntitySummaries();
softAssert.assertEquals(summaries.length, 1, "Single process should be shown.");
softAssert.assertEquals(summaries[0].getStatus(), "RUNNING", "Wrong status was returned.");
softAssert.assertEquals(summaries[0].getInstances().length, 2, "Invalid instances number.");
//filterBy RUNNING status AND specific pipeline name of one of the running processes.
//Result should reflect 'AND' logic and return the only process with correct pipeline
//name which matches to a given one in request. Also we should use 'fields' option to get
//pipelines listed in entity summary.
summaries = prism.getProcessHelper().getEntitySummary(clusterName,
"filterBy=STATUS:RUNNING,PIPELINES:pipeline_2&fields=PIPELINES")
.getEntitySummaryResult().getEntitySummaries();
softAssert.assertEquals(summaries.length, 1, "Single process should be shown.");
softAssert.assertEquals(summaries[0].getName(), pNames.get(1), "Invalid process name returned");
softAssert.assertEquals(summaries[0].getPipelines()[0], "pipeline_2",
"Summary returned process with a wrong pipeline name.");
//test 'tags' option. 1st and 3d processes should be returned in ascending order.
summaries = prism.getProcessHelper()
.getEntitySummary(clusterName, "tags=value=1&orderBy=name&sortOrder=asc")
.getEntitySummaryResult().getEntitySummaries();
softAssert.assertEquals(summaries.length, 2, "Wrong number of processes.");
softAssert.assertEquals(summaries[0].getName(), pNames.get(0), "Wrong order.");
softAssert.assertEquals(summaries[0].getStatus(), "SUSPENDED", "Wrong status.");
softAssert.assertEquals(summaries[1].getName(), pNames.get(2), "Wrong order.");
softAssert.assertEquals(summaries[1].getStatus(), "RUNNING", "Wrong status.");
//attempt to use invalid option or values should not return any summaries.
EntitySummaryResult summary = prism.getProcessHelper()
.getEntitySummary(clusterName, "filterBy=STATUS:STUCK")
.getEntitySummaryResult();
softAssert.assertEquals(summary.getStatus(), APIResult.Status.SUCCEEDED);
softAssert.assertEquals(summary.getMessage(), "Entity Summary Result", "Invalid message.");
softAssert.assertEquals(summary.getEntitySummaries(), null, "Entity summary should be null.");
softAssert.assertAll();
}
}