blob: ed0282e18162e4958cd942609923d8f8d9631482 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.regression.lineage;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.falcon.regression.Entities.ClusterMerlin;
import org.apache.falcon.regression.Entities.FeedMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.helpers.LineageHelper;
import org.apache.falcon.regression.core.response.lineage.Direction;
import org.apache.falcon.regression.core.response.lineage.Edge;
import org.apache.falcon.regression.core.response.lineage.EdgesResult;
import org.apache.falcon.regression.core.response.lineage.Vertex;
import org.apache.falcon.regression.core.response.lineage.VertexIdsResult;
import org.apache.falcon.regression.core.response.lineage.VertexResult;
import org.apache.falcon.regression.core.response.lineage.VerticesResult;
import org.apache.falcon.regression.core.util.AssertUtil;
import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.CleanupUtil;
import org.apache.falcon.regression.core.util.Generator;
import org.apache.falcon.regression.core.util.GraphAssert;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.http.HttpResponse;
import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
@Test(groups = "lineage-rest")
public class LineageApiTest extends BaseTestClass {
private static final Logger logger = Logger.getLogger(LineageApiTest.class);
private static final String testName = "LineageApiTest";
private static final String testTag =
Edge.LEBEL_TYPE.TESTNAME.toString().toLowerCase() + "=" + testName;
private static final String VERTEX_NOT_FOUND_REGEX = ".*Vertex.*%d.*not.*found.*\n?";
private static final String inValidArgumentStr = "Invalid argument";
LineageHelper lineageHelper;
final ColoHelper cluster = servers.get(0);
final String baseTestHDFSDir = baseHDFSDir + "/LineageApiTest";
final String feedInputPath = baseTestHDFSDir + "/input";
final String feedOutputPath = baseTestHDFSDir + "/output";
// use 5 <= x < 10 input feeds
final int numInputFeeds = 5 + new Random().nextInt(5);
// use 5 <= x < 10 output feeds
final int numOutputFeeds = 5 + new Random().nextInt(5);
ClusterMerlin clusterMerlin;
FeedMerlin[] inputFeeds;
FeedMerlin[] outputFeeds;
@BeforeClass(alwaysRun = true)
public void init() {
lineageHelper = new LineageHelper(prism);
}
@BeforeMethod(alwaysRun = true, firstTimeOnly = true)
public void setUp() throws Exception {
CleanupUtil.cleanAllEntities(prism);
Bundle bundle = BundleUtil.readELBundle(baseAppHDFSDir, this.getClass().getSimpleName());
bundle.generateUniqueBundle();
bundles[0] = new Bundle(bundle, cluster);
final List<String> clusterStrings = bundles[0].getClusters();
Assert.assertEquals(clusterStrings.size(), 1, "Expecting only 1 clusterMerlin.");
clusterMerlin = new ClusterMerlin(clusterStrings.get(0));
clusterMerlin.setTags(testTag);
AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(clusterMerlin.toString()));
logger.info("numInputFeeds = " + numInputFeeds);
logger.info("numOutputFeeds = " + numOutputFeeds);
final FeedMerlin inputMerlin = new FeedMerlin(bundles[0].getInputFeedFromBundle());
inputMerlin.setTags(testTag);
inputFeeds = generateFeeds(numInputFeeds, inputMerlin,
Generator.getNameGenerator("infeed", inputMerlin.getName()),
Generator.getHadoopPathGenerator(feedInputPath, MINUTE_DATE_PATTERN));
for (FeedMerlin feed : inputFeeds) {
AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(feed.toString()));
}
FeedMerlin outputMerlin = new FeedMerlin(bundles[0].getOutputFeedFromBundle());
outputMerlin.setTags(testTag);
outputFeeds = generateFeeds(numOutputFeeds, outputMerlin,
Generator.getNameGenerator("outfeed", outputMerlin.getName()),
Generator.getHadoopPathGenerator(feedOutputPath, MINUTE_DATE_PATTERN));
for (FeedMerlin feed : outputFeeds) {
AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(feed.toString()));
}
}
public static FeedMerlin[] generateFeeds(final int numInputFeeds,
final FeedMerlin originalFeedMerlin,
final Generator nameGenerator,
final Generator pathGenerator) {
FeedMerlin[] inputFeeds = new FeedMerlin[numInputFeeds];
//submit all input feeds
for(int count = 0; count < numInputFeeds; ++count) {
final FeedMerlin feed = new FeedMerlin(originalFeedMerlin.toString());
feed.setName(nameGenerator.generate());
feed.setLocation(LocationType.DATA, pathGenerator.generate());
inputFeeds[count] = feed;
}
return inputFeeds;
}
@AfterMethod(alwaysRun = true, lastTimeOnly = true)
public void tearDown() {
for (FeedMerlin inputFeed : inputFeeds) {
CleanupUtil.deleteQuietly(prism.getFeedHelper(), inputFeed.toString());
}
for (FeedMerlin outputFeed : outputFeeds) {
CleanupUtil.deleteQuietly(prism.getFeedHelper(), outputFeed.toString());
}
removeBundles();
}
/**
* Get all vertices from falcon and check that they are sane
* @throws Exception
*/
@Test
public void testAllVertices() throws Exception {
final VerticesResult verticesResult = lineageHelper.getAllVertices();
logger.info(verticesResult);
GraphAssert.assertVertexSanity(verticesResult);
GraphAssert.assertUserVertexPresence(verticesResult);
GraphAssert.assertVerticesPresenceMinOccur(verticesResult, Vertex.VERTEX_TYPE.COLO, 1);
GraphAssert.assertVerticesPresenceMinOccur(verticesResult, Vertex.VERTEX_TYPE.TAGS, 1);
GraphAssert.assertVerticesPresenceMinOccur(verticesResult, Vertex.VERTEX_TYPE.CLUSTER_ENTITY, 1);
GraphAssert.assertVerticesPresenceMinOccur(verticesResult,
Vertex.VERTEX_TYPE.FEED_ENTITY, numInputFeeds + numOutputFeeds);
}
/**
* Get a vertex by id and check results
* @throws Exception
*/
@Test
public void testVertexId() throws Exception {
final VerticesResult userResult =
lineageHelper.getVerticesByName(MerlinConstants.CURRENT_USER_NAME);
GraphAssert.assertVertexSanity(userResult);
final int vertexId = userResult.getResults().get(0).get_id();
final VertexResult userVertex =
lineageHelper.getVertexById(vertexId);
Assert.assertEquals(userResult.getResults().get(0), userVertex.getResults(),
"Same vertex should have been returned.");
}
/**
* Negative test - get a vertex without specifying id, we should not get internal server error
* @throws Exception
*/
@Test
public void testVertexNoId() throws Exception {
HttpResponse response = lineageHelper.runGetRequest(
lineageHelper.getUrl(LineageHelper.URL.VERTICES, ""));
String responseString = lineageHelper.getResponseString(response);
logger.info("response: " + response);
logger.info("responseString: " + responseString);
Assert.assertNotEquals(response.getStatusLine().getStatusCode(),
HttpStatus.SC_INTERNAL_SERVER_ERROR,
"We should not get internal server error");
}
/**
* Negative test - get a vertex specifying an invalid id, we should not get http non-found error
* @throws Exception
*/
@Test
public void testVertexInvalidId() throws Exception {
final VerticesResult allVerticesResult =
lineageHelper.getAllVertices();
GraphAssert.assertVertexSanity(allVerticesResult);
int invalidVertexId = -1;
for (Vertex vertex : allVerticesResult.getResults()) {
if(invalidVertexId <= vertex.get_id()) {
invalidVertexId = vertex.get_id() + 1;
}
}
HttpResponse response = lineageHelper.runGetRequest(
lineageHelper.getUrl(LineageHelper.URL.VERTICES, "" + invalidVertexId));
String responseString = lineageHelper.getResponseString(response);
logger.info("response: " + response);
logger.info("responseString: " + responseString);
Assert.assertTrue(
responseString.matches(String.format(VERTEX_NOT_FOUND_REGEX, invalidVertexId)),
"Unexpected responseString: " + responseString);
Assert.assertEquals(response.getStatusLine().getStatusCode(),
HttpStatus.SC_NOT_FOUND,
"We should get http not found error");
}
/**
* Get properties of one type of vertex and check those properties
* @param vertexType type of the vertex that we want to check
*/
private void checkVertexOneProperty(Vertex.VERTEX_TYPE vertexType) {
final VerticesResult coloResult = lineageHelper.getVerticesByType(vertexType);
GraphAssert.assertVertexSanity(coloResult);
for (Vertex coloVertex : coloResult.getResults()) {
final int coloVertexId = coloVertex.get_id();
final VertexResult coloProperties = lineageHelper.getVertexProperties(coloVertexId);
Assert.assertNotNull(coloProperties.getResults().getName(),
"name should not be null");
Assert.assertEquals(coloProperties.getResults().getType(), vertexType);
Assert.assertNotNull(coloProperties.getResults().getTimestamp(),
"timestamp should not be null");
}
}
/**
* Test vertex properties for different types of vertices
* @throws Exception
*/
@Test
public void testVertexProperties() throws Exception {
//testing properties of a user vertex
checkVertexOneProperty(Vertex.VERTEX_TYPE.USER);
//testing properties of colo vertices
checkVertexOneProperty(Vertex.VERTEX_TYPE.COLO);
//testing properties of group vertices
checkVertexOneProperty(Vertex.VERTEX_TYPE.GROUPS);
//testing properties of group vertices
//checkVertexOneProperty(Vertex.VERTEX_TYPE.FEED_ENTITY);
}
/**
* Test vertex properties supplying a blank id, expecting http not found error
* @throws Exception
*/
@Test
public void testVertexPropertiesNoId() throws Exception {
//testing properties of a user vertex
HttpResponse response = lineageHelper.runGetRequest(lineageHelper
.getUrl(LineageHelper.URL.VERTICES_PROPERTIES, lineageHelper.getUrlPath("")));
String responseString = lineageHelper.getResponseString(response);
logger.info("response: " + response);
logger.info("responseString: " + responseString);
Assert.assertEquals(response.getStatusLine().getStatusCode(),
HttpStatus.SC_NOT_FOUND, "We should get http not found error");
}
/**
* Test vertex properties supplying an invalid id, expecting http not found error
* @throws Exception
*/
@Test
public void testVertexPropertiesInvalidId() throws Exception {
final VerticesResult allVerticesResult =
lineageHelper.getAllVertices();
GraphAssert.assertVertexSanity(allVerticesResult);
int invalidVertexId = -1;
for (Vertex vertex : allVerticesResult.getResults()) {
if(invalidVertexId <= vertex.get_id()) {
invalidVertexId = vertex.get_id() + 1;
}
}
HttpResponse response = lineageHelper.runGetRequest(
lineageHelper.getUrl(LineageHelper.URL.VERTICES_PROPERTIES, "" + invalidVertexId));
String responseString = lineageHelper.getResponseString(response);
logger.info("response: " + response);
logger.info("responseString: " + responseString);
Assert.assertTrue(
responseString.matches(String.format(VERTEX_NOT_FOUND_REGEX, invalidVertexId)),
"Unexpected responseString: " + responseString);
Assert.assertEquals(response.getStatusLine().getStatusCode(),
HttpStatus.SC_NOT_FOUND,
"We should get http not found error");
}
/**
* Test filtering vertices by name
* @throws Exception
*/
@Test
public void testVerticesFilterByName() throws Exception {
final String clusterName = clusterMerlin.getName();
final VerticesResult clusterVertices = lineageHelper.getVerticesByName(clusterName);
GraphAssert.assertVertexSanity(clusterVertices);
GraphAssert.assertVerticesPresenceMinOccur(clusterVertices,
Vertex.VERTEX_TYPE.CLUSTER_ENTITY, 1);
GraphAssert.assertVertexPresence(clusterVertices, clusterName);
for(int i = 0; i < numInputFeeds; ++i) {
final String feedName = inputFeeds[i].getName();
final VerticesResult feedVertices = lineageHelper.getVerticesByName(feedName);
GraphAssert.assertVertexSanity(feedVertices);
GraphAssert.assertVerticesPresenceMinOccur(feedVertices,
Vertex.VERTEX_TYPE.FEED_ENTITY, 1);
GraphAssert.assertVertexPresence(feedVertices, feedName);
}
for(int i = 0; i < numOutputFeeds; ++i) {
final String feedName = outputFeeds[i].getName();
final VerticesResult feedVertices = lineageHelper.getVerticesByName(feedName);
GraphAssert.assertVertexSanity(feedVertices);
GraphAssert.assertVerticesPresenceMinOccur(feedVertices,
Vertex.VERTEX_TYPE.FEED_ENTITY, 1);
GraphAssert.assertVertexPresence(feedVertices, feedName);
}
}
/**
* Test filtering vertices by type
* @throws Exception
*/
@Test
public void testVerticesFilterByType() throws Exception {
final VerticesResult clusterVertices =
lineageHelper.getVerticesByType(Vertex.VERTEX_TYPE.CLUSTER_ENTITY);
GraphAssert.assertVertexSanity(clusterVertices);
GraphAssert.assertVerticesPresenceMinOccur(clusterVertices,
Vertex.VERTEX_TYPE.CLUSTER_ENTITY, 1);
GraphAssert.assertVertexPresence(clusterVertices, clusterMerlin.getName());
final VerticesResult feedVertices =
lineageHelper.getVerticesByType(Vertex.VERTEX_TYPE.FEED_ENTITY);
GraphAssert.assertVertexSanity(feedVertices);
GraphAssert.assertVerticesPresenceMinOccur(feedVertices,
Vertex.VERTEX_TYPE.FEED_ENTITY, 1);
for (FeedMerlin oneFeed : inputFeeds) {
GraphAssert.assertVertexPresence(feedVertices, oneFeed.getName());
}
for (FeedMerlin oneFeed : outputFeeds) {
GraphAssert.assertVertexPresence(feedVertices, oneFeed.getName());
}
}
/**
* Test filtering vertices when no output is produced
* @throws Exception
*/
@Test
public void testVerticesFilterNoOutput() throws Exception {
final String nonExistingName = "this-is-a-non-existing-name";
final VerticesResult clusterVertices = lineageHelper.getVerticesByName(nonExistingName);
GraphAssert.assertVertexSanity(clusterVertices);
Assert.assertEquals(clusterVertices.getTotalSize(), 0,
"Result should not contain any vertex");
}
@Test
public void testVerticesFilterBlankValue() throws Exception {
Map<String, String> params = new TreeMap<String, String>();
params.put("key", Vertex.FilterKey.name.toString());
params.put("value", "");
HttpResponse response = lineageHelper
.runGetRequest(lineageHelper.getUrl(LineageHelper.URL.VERTICES, params));
String responseString = lineageHelper.getResponseString(response);
logger.info(responseString);
Assert.assertEquals(response.getStatusLine().getStatusCode(),
HttpStatus.SC_BAD_REQUEST,
"The get request was a bad request");
Assert.assertTrue(responseString.contains(inValidArgumentStr),
"Result should contain string Invalid argument");
}
@Test
public void testVerticesFilterBlankKey() throws Exception {
Map<String, String> params = new TreeMap<String, String>();
params.put("key", "");
params.put("value", "someValue");
HttpResponse response = lineageHelper.runGetRequest(
lineageHelper.getUrl(LineageHelper.URL.VERTICES, params));
String responseString = lineageHelper.getResponseString(response);
logger.info(responseString);
Assert.assertEquals(response.getStatusLine().getStatusCode(),
HttpStatus.SC_BAD_REQUEST,
"The get request was a bad request");
Assert.assertTrue(responseString.contains(inValidArgumentStr),
"Result should contain string Invalid argument");
}
@Test
public void testVertexDirectionFetchEdges() throws Exception {
final int clusterVertexId = lineageHelper.getVertex(clusterMerlin.getName()).get_id();
final EdgesResult bothEdges =
lineageHelper.getEdgesByDirection(clusterVertexId, Direction.bothEdges);
GraphAssert.assertEdgeSanity(bothEdges);
Assert.assertEquals(bothEdges.filterByType(Edge.LEBEL_TYPE.STORED_IN).size(),
inputFeeds.length + outputFeeds.length,
"There should be edge between the cluster and inputFeeds, outputFeeds");
Assert.assertEquals(bothEdges.filterByType(Edge.LEBEL_TYPE.CLUSTER_COLO).size(),
1, "There should be an edge from the cluster to colo");
Assert.assertEquals(bothEdges.getTotalSize(), inputFeeds.length + outputFeeds.length + 2,
"There should be edge from the cluster to inputFeeds & outputFeeds," +
" one between cluster and colo, one between cluster and classification");
final EdgesResult inComingEdges =
lineageHelper.getEdgesByDirection(clusterVertexId, Direction.inComingEdges);
GraphAssert.assertEdgeSanity(inComingEdges);
Assert.assertEquals(inComingEdges.getTotalSize(), inputFeeds.length + outputFeeds.length,
"There should be edge from the cluster to inputFeeds & outputFeeds");
Assert.assertEquals(inComingEdges.filterByType(Edge.LEBEL_TYPE.STORED_IN).size(),
inputFeeds.length + outputFeeds.length,
"There should be edge from the cluster to inputFeeds & outputFeeds");
final EdgesResult outGoingEdges =
lineageHelper.getEdgesByDirection(clusterVertexId, Direction.outGoingEdges);
GraphAssert.assertEdgeSanity(outGoingEdges);
Assert.assertEquals(outGoingEdges.filterByType(Edge.LEBEL_TYPE.CLUSTER_COLO).size(),
1, "There should be an edge from the cluster to colo");
Assert.assertEquals(outGoingEdges.filterByType(Edge.LEBEL_TYPE.TESTNAME).size(),
1, "There should be an edge from the cluster to classification");
Assert.assertEquals(outGoingEdges.getTotalSize(), 2,
"There should be an edge from the cluster to colo");
}
@Test
public void testVertexCountsFetchVertices() throws Exception {
final int clusterVertexId = lineageHelper.getVertex(clusterMerlin.getName()).get_id();
final VerticesResult bothVertices =
lineageHelper.getVerticesByDirection(clusterVertexId, Direction.bothVertices);
GraphAssert.assertVertexSanity(bothVertices);
Assert.assertEquals(bothVertices.filterByType(Vertex.VERTEX_TYPE.FEED_ENTITY).size(),
inputFeeds.length + outputFeeds.length,
"There should be edge from the cluster to inputFeeds & outputFeeds");
Assert.assertEquals(bothVertices.filterByType(Vertex.VERTEX_TYPE.COLO).size(), 1,
"The should be one edge between cluster and colo");
Assert.assertEquals(bothVertices.getTotalSize(),
inputFeeds.length + outputFeeds.length + 2,
"There should be edge from the cluster to inputFeeds & outputFeeds," +
" one between cluster and colo, one between cluster and classification");
final VerticesResult inComingVertices =
lineageHelper.getVerticesByDirection(clusterVertexId, Direction.inComingVertices);
GraphAssert.assertVertexSanity(inComingVertices);
Assert.assertEquals(inComingVertices.filterByType(Vertex.VERTEX_TYPE.FEED_ENTITY).size(),
inputFeeds.length + outputFeeds.length,
"There should be edge from the cluster to inputFeeds & outputFeeds");
Assert.assertEquals(inComingVertices.getTotalSize(),
inputFeeds.length + outputFeeds.length,
"There should be edge from the cluster to inputFeeds & outputFeeds and one " +
"between cluster and colo");
final VerticesResult outgoingVertices =
lineageHelper.getVerticesByDirection(clusterVertexId, Direction.outgoingVertices);
GraphAssert.assertVertexSanity(outgoingVertices);
Assert.assertEquals(outgoingVertices.filterByType(Vertex.VERTEX_TYPE.COLO).size(), 1,
"The should be one edge between cluster and colo");
Assert.assertEquals(outgoingVertices.filterByName(testName).size(),
1, "There should be an edge from the cluster to classification");
Assert.assertEquals(outgoingVertices.getTotalSize(), 2,
"There should be an edge from the cluster to colo");
}
@Test
public void testVertexDirectionFetchCounts() throws Exception {
final int clusterVertexId = lineageHelper.getVertex(clusterMerlin.getName()).get_id();
final VerticesResult bothCount =
lineageHelper.getVerticesByDirection(clusterVertexId, Direction.bothCount);
Assert.assertEquals(bothCount.getTotalSize(),
inputFeeds.length + outputFeeds.length + 2,
"There should be edge from the cluster to inputFeeds & outputFeeds," +
" one between cluster and colo, one between cluster and classification");
final VerticesResult inCount =
lineageHelper.getVerticesByDirection(clusterVertexId, Direction.inCount);
Assert.assertEquals(inCount.getTotalSize(),
inputFeeds.length + outputFeeds.length,
"There should be edge from the cluster to inputFeeds & outputFeeds and one " +
"between cluster and colo");
final VerticesResult outCount =
lineageHelper.getVerticesByDirection(clusterVertexId, Direction.outCount);
Assert.assertEquals(outCount.getTotalSize(), 2,
"There should be an edge from the cluster to colo");
}
@Test
public void testVertexDirectionFetchVertexIds() throws Exception {
final int clusterVertexId = lineageHelper.getVertex(clusterMerlin.getName()).get_id();
final VertexIdsResult bothVerticesIds =
lineageHelper.getVertexIdsByDirection(clusterVertexId, Direction.bothVerticesIds);
for (Integer vertexId : bothVerticesIds.getResults()) {
Assert.assertTrue(vertexId > 0, "Vertex id should be valid.");
}
Assert.assertEquals(bothVerticesIds.getTotalSize(),
inputFeeds.length + outputFeeds.length + 2,
"There should be edge from the cluster to inputFeeds & outputFeeds," +
" one between cluster and colo, one between cluster and classification");
final VertexIdsResult incomingVerticesIds =
lineageHelper.getVertexIdsByDirection(clusterVertexId, Direction.incomingVerticesIds);
for (Integer vertexId : incomingVerticesIds.getResults()) {
Assert.assertTrue(vertexId > 0, "Vertex id should be valid.");
}
Assert.assertEquals(incomingVerticesIds.getTotalSize(),
inputFeeds.length + outputFeeds.length,
"There should be edge from the cluster to inputFeeds & outputFeeds and one " +
"between cluster and colo");
final VertexIdsResult outgoingVerticesIds =
lineageHelper.getVertexIdsByDirection(clusterVertexId, Direction.outgoingVerticesIds);
for (Integer vertexId : outgoingVerticesIds.getResults()) {
Assert.assertTrue(vertexId > 0, "Vertex id should be valid.");
}
Assert.assertEquals(outgoingVerticesIds.getTotalSize(), 2,
"There should be an edge from the cluster to colo and one from cluster to " +
"classification");
}
@Test
public void testVertexBadDirection() throws Exception {
final int clusterVertexId = lineageHelper.getVertex(clusterMerlin.getName()).get_id();
HttpResponse response = lineageHelper
.runGetRequest(lineageHelper.getUrl(LineageHelper.URL.VERTICES,
lineageHelper.getUrlPath(clusterVertexId, "badDirection")));
final String responseString = lineageHelper.getResponseString(response);
logger.info("response: " + response);
logger.info("responseString: " + responseString);
Assert.assertEquals(response.getStatusLine().getStatusCode(),
HttpStatus.SC_BAD_REQUEST,
"We should not get internal server error");
}
@Test
public void testAllEdges() throws Exception {
final EdgesResult edgesResult = lineageHelper.getAllEdges();
logger.info(edgesResult);
Assert.assertTrue(edgesResult.getTotalSize() > 0, "Total number of edges should be" +
" greater that zero but is: " + edgesResult.getTotalSize());
GraphAssert.assertEdgeSanity(edgesResult);
GraphAssert.assertEdgePresenceMinOccur(edgesResult, Edge.LEBEL_TYPE.CLUSTER_COLO, 1);
GraphAssert.assertEdgePresenceMinOccur(edgesResult, Edge.LEBEL_TYPE.STORED_IN,
numInputFeeds + numOutputFeeds);
GraphAssert.assertEdgePresenceMinOccur(edgesResult, Edge.LEBEL_TYPE.OWNED_BY,
1 + numInputFeeds + numOutputFeeds);
}
@Test
public void testEdge() throws Exception {
final int clusterVertexId = lineageHelper.getVertex(clusterMerlin.getName()).get_id();
final EdgesResult outGoingEdges =
lineageHelper.getEdgesByDirection(clusterVertexId, Direction.outGoingEdges);
GraphAssert.assertEdgeSanity(outGoingEdges);
Assert.assertEquals(outGoingEdges.filterByType(Edge.LEBEL_TYPE.CLUSTER_COLO).size(),
1, "There should be an edge from the cluster to colo");
final String clusterColoEdgeId =
outGoingEdges.filterByType(Edge.LEBEL_TYPE.CLUSTER_COLO).get(0).get_id();
final Edge clusterColoEdge =
lineageHelper.getEdgeById(clusterColoEdgeId).getResults();
GraphAssert.assertEdgeSanity(clusterColoEdge);
}
@Test
public void testEdgeBlankId() throws Exception {
final HttpResponse httpResponse = lineageHelper.runGetRequest(
lineageHelper.getUrl(LineageHelper.URL.EDGES, lineageHelper.getUrlPath("")));
logger.info(httpResponse.toString());
logger.info(lineageHelper.getResponseString(httpResponse));
Assert.assertEquals(httpResponse.getStatusLine().getStatusCode(),
HttpStatus.SC_NOT_FOUND,
"Expecting not-found error.");
}
@Test
public void testEdgeInvalidId() throws Exception {
final HttpResponse response = lineageHelper.runGetRequest(
lineageHelper.getUrl(LineageHelper.URL.EDGES, lineageHelper.getUrlPath("invalid-id")));
logger.info(response.toString());
logger.info(lineageHelper.getResponseString(response));
Assert.assertEquals(response.getStatusLine().getStatusCode(),
HttpStatus.SC_NOT_FOUND,
"Expecting not-found error.");
}
@Test
public void testColoToClusterNode() throws Exception {
final VerticesResult verticesResult = lineageHelper.getVerticesByType(
Vertex.VERTEX_TYPE.COLO);
GraphAssert.assertVertexSanity(verticesResult);
Assert.assertTrue(verticesResult.getTotalSize() > 0, "Expected at least 1 colo node");
Assert.assertTrue(verticesResult.getTotalSize() <= 3, "Expected at most 3 colo nodes");
final List<Vertex> colo1Vertex = verticesResult.filterByName(clusterMerlin.getColo());
AssertUtil.checkForListSize(colo1Vertex, 1);
Vertex coloVertex = colo1Vertex.get(0);
logger.info("coloVertex: " + coloVertex);
final VerticesResult verticesByDirection =
lineageHelper.getVerticesByDirection(coloVertex.get_id(), Direction.inComingVertices);
AssertUtil.checkForListSize(
verticesByDirection.filterByName(clusterMerlin.getName()), 1);
}
@Test
public void testClusterNodeToFeedNode() throws Exception {
final VerticesResult clusterResult = lineageHelper.getVerticesByName(
clusterMerlin.getName());
GraphAssert.assertVertexSanity(clusterResult);
Vertex clusterVertex = clusterResult.getResults().get(0);
final VerticesResult clusterIncoming =
lineageHelper.getVerticesByDirection(clusterVertex.get_id(), Direction.inComingVertices);
GraphAssert.assertVertexSanity(clusterIncoming);
for(FeedMerlin feed : inputFeeds) {
AssertUtil.checkForListSize(clusterIncoming.filterByName(feed.getName()), 1);
}
for(FeedMerlin feed : outputFeeds) {
AssertUtil.checkForListSize(clusterIncoming.filterByName(feed.getName()), 1);
}
}
@Test
public void testUserToEntityNode() throws Exception {
final VerticesResult userResult = lineageHelper.getVerticesByName(
MerlinConstants.CURRENT_USER_NAME);
GraphAssert.assertVertexSanity(userResult);
Vertex clusterVertex = userResult.getResults().get(0);
final VerticesResult userIncoming =
lineageHelper.getVerticesByDirection(clusterVertex.get_id(), Direction.inComingVertices);
GraphAssert.assertVertexSanity(userIncoming);
for(FeedMerlin feed : inputFeeds) {
AssertUtil.checkForListSize(userIncoming.filterByName(feed.getName()), 1);
}
for(FeedMerlin feed : outputFeeds) {
AssertUtil.checkForListSize(userIncoming.filterByName(feed.getName()), 1);
}
}
@AfterClass(alwaysRun = true)
public void tearDownClass() throws IOException {
cleanTestDirs();
}
}