blob: 2406f12be9abeba5bc198b9d993cd71df3827b45 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.integration;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.v1.model.instance.Id;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct;
import org.apache.atlas.v1.model.typedef.TraitTypeDefinition;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.v1.typesystem.types.utils.TypesUtil;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.testng.Assert.assertEquals;
/**
* Hive Lineage Integration Tests.
*/
public class DataSetLineageJerseyResourceIT extends BaseResourceIT {
private String salesFactTable;
private String salesMonthlyTable;
private String salesDBName;
private static String FACT = "Fact_Tag";
private static String ETL = "ETL_Tag";
private static String DIMENSION = "Dimension_Tag";
private static String METRIC = "Metric_Tag";
private static String PII = "pii_Tag";
@BeforeClass
public void setUp() throws Exception {
super.setUp();
createTypeDefinitionsV1();
setupInstances();
}
@Test
public void testInputsGraph() throws Exception {
String tableId = atlasClientV1.getEntity(HIVE_TABLE_TYPE, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, salesMonthlyTable).getId()._getId();
ObjectNode response = atlasClientV1.callAPIWithBodyAndParams(AtlasClient.API_V1.LINEAGE_INPUTS_GRAPH, null, tableId, "/inputs/graph");
Assert.assertNotNull(response);
System.out.println("inputs graph = " + response);
Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID));
JsonNode results = response.get(AtlasClient.RESULTS);
Assert.assertNotNull(results);
Struct resultsInstance = AtlasType.fromV1Json(results.toString(), Struct.class);
Map<String, Struct> vertices = (Map<String, Struct>) resultsInstance.get("vertices");
Assert.assertEquals(vertices.size(), 4);
Map<String, Struct> edges = (Map<String, Struct>) resultsInstance.get("edges");
Assert.assertEquals(edges.size(), 4);
}
@Test
public void testInputsGraphForEntity() throws Exception {
String tableId = atlasClientV1.getEntity(HIVE_TABLE_TYPE, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
salesMonthlyTable).getId()._getId();
ObjectNode results = atlasClientV1.getInputGraphForEntity(tableId);
Assert.assertNotNull(results);
Struct resultsInstance = AtlasType.fromV1Json(results.toString(), Struct.class);
resultsInstance.normalize();
Map<String, Object> vertices = (Map<String, Object>) resultsInstance.get("vertices");
Assert.assertEquals(vertices.size(), 4);
Object verticesObject = vertices.get(tableId);
Struct vertex = null;
if (verticesObject instanceof Map) {
vertex = new Struct ((Map)verticesObject);
} else if (verticesObject instanceof Struct) {
vertex = (Struct)verticesObject;
}
assertEquals(((Struct) vertex.get("vertexId")).get("state"), Id.EntityState.ACTIVE.name());
Map<String, Struct> edges = (Map<String, Struct>) resultsInstance.get("edges");
Assert.assertEquals(edges.size(), 4);
}
@Test
public void testOutputsGraph() throws Exception {
String tableId = atlasClientV1.getEntity(HIVE_TABLE_TYPE, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, salesFactTable).getId()._getId();
ObjectNode response = atlasClientV1.callAPIWithBodyAndParams(AtlasClient.API_V1.LINEAGE_INPUTS_GRAPH, null, tableId, "/outputs/graph");
Assert.assertNotNull(response);
System.out.println("outputs graph= " + response);
Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID));
JsonNode results = response.get(AtlasClient.RESULTS);
Assert.assertNotNull(results);
Struct resultsInstance = AtlasType.fromV1Json(results.toString(), Struct.class);
Map<String, Struct> vertices = (Map<String, Struct>) resultsInstance.get("vertices");
Assert.assertEquals(vertices.size(), 3);
Map<String, Struct> edges = (Map<String, Struct>) resultsInstance.get("edges");
Assert.assertEquals(edges.size(), 4);
}
@Test
public void testOutputsGraphForEntity() throws Exception {
String tableId = atlasClientV1.getEntity(HIVE_TABLE_TYPE, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, salesFactTable).getId()._getId();
ObjectNode results = atlasClientV1.getOutputGraphForEntity(tableId);
Assert.assertNotNull(results);
Struct resultsInstance = AtlasType.fromV1Json(results.toString(), Struct.class);
Map<String, Object> vertices = (Map<String, Object>) resultsInstance.get("vertices");
Assert.assertEquals(vertices.size(), 3);
Object verticesObject = vertices.get(tableId);
Struct vertex = null;
if (verticesObject instanceof Map) {
vertex = new Struct ((Map)verticesObject);
} else if (verticesObject instanceof Struct) {
vertex = (Struct)verticesObject;
}
assertEquals(((Struct) vertex.get("vertexId")).get("state"), Id.EntityState.ACTIVE.name());
Map<String, Struct> edges = (Map<String, Struct>) resultsInstance.get("edges");
Assert.assertEquals(edges.size(), 4);
}
@Test
public void testSchema() throws Exception {
String tableId = atlasClientV1.getEntity(HIVE_TABLE_TYPE, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, salesFactTable).getId()._getId();
ObjectNode response = atlasClientV1.getSchemaForEntity(tableId);
}
@Test
public void testSchemaForEntity() throws Exception {
String tableId = atlasClientV1.getEntity(HIVE_TABLE_TYPE, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, salesFactTable).getId()._getId();
ObjectNode results = atlasClientV1.getSchemaForEntity(tableId);
}
@Test(expectedExceptions = AtlasServiceException.class)
public void testSchemaForInvalidTable() throws Exception {
ObjectNode response = atlasClientV1.callAPIWithBodyAndParams(AtlasClient.API_V1.NAME_LINEAGE_SCHEMA, null, "blah", "schema");
}
@Test(expectedExceptions = AtlasServiceException.class)
public void testSchemaForDB() throws Exception {
ObjectNode response = atlasClientV1.callAPIWithBodyAndParams(AtlasClient.API_V1.NAME_LINEAGE_SCHEMA, null, salesDBName, "schema");
}
private void setupInstances() throws Exception {
TraitTypeDefinition factTrait = TypesUtil.createTraitTypeDef(FACT, null, Collections.<String>emptySet());
TraitTypeDefinition etlTrait = TypesUtil.createTraitTypeDef(ETL, null, Collections.<String>emptySet());
TraitTypeDefinition dimensionTrait = TypesUtil.createTraitTypeDef(DIMENSION, null, Collections.<String>emptySet());
TraitTypeDefinition metricTrait = TypesUtil.createTraitTypeDef(METRIC, null, Collections.<String>emptySet());
createType(getTypesDef(null, null, Arrays.asList(factTrait, etlTrait, dimensionTrait, metricTrait), null));
salesDBName = "Sales" + randomString();
Id salesDB = database(salesDBName, "Sales Database", "John ETL",
"hdfs://host:8000/apps/warehouse/sales");
List<Referenceable> salesFactColumns = Arrays.asList(column("time_id", "int", "time id"), column("product_id", "int", "product id"),
column("customer_id", "int", "customer id", PII),
column("sales", "double", "product id", METRIC));
salesFactTable = "sales_fact" + randomString();
Id salesFact = table(salesFactTable, "sales fact table", salesDB, "Joe", "MANAGED", salesFactColumns, FACT);
List<Referenceable> timeDimColumns = Arrays.asList(column("time_id", "int", "time id"), column("dayOfYear", "int", "day Of Year"),
column("weekDay", "int", "week Day"));
Id timeDim =
table("time_dim" + randomString(), "time dimension table", salesDB, "John Doe", "EXTERNAL",
timeDimColumns, DIMENSION);
Id reportingDB =
database("Reporting" + randomString(), "reporting database", "Jane BI",
"hdfs://host:8000/apps/warehouse/reporting");
Id salesFactDaily =
table("sales_fact_daily_mv" + randomString(), "sales fact daily materialized view", reportingDB,
"Joe BI", "MANAGED", salesFactColumns, METRIC);
loadProcess("loadSalesDaily" + randomString(), "John ETL", Arrays.asList(salesFact, timeDim),
Collections.singletonList(salesFactDaily), "create table as select ", "plan", "id", "graph", ETL);
salesMonthlyTable = "sales_fact_monthly_mv" + randomString();
Id salesFactMonthly =
table(salesMonthlyTable, "sales fact monthly materialized view", reportingDB, "Jane BI",
"MANAGED", salesFactColumns, METRIC);
loadProcess("loadSalesMonthly" + randomString(), "John ETL", Collections.singletonList(salesFactDaily),
Collections.singletonList(salesFactMonthly), "create table as select ", "plan", "id", "graph", ETL);
}
Id database(String name, String description, String owner, String locationUri, String... traitNames)
throws Exception {
Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
referenceable.set(NAME, name);
referenceable.set(QUALIFIED_NAME, name);
referenceable.set(CLUSTER_NAME, locationUri + name);
referenceable.set("description", description);
referenceable.set("owner", owner);
referenceable.set("locationUri", locationUri);
referenceable.set("createTime", System.currentTimeMillis());
return createInstance(referenceable);
}
Referenceable column(String name, String type, String comment, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);
referenceable.set(NAME, name);
referenceable.set(QUALIFIED_NAME, name);
referenceable.set("type", type);
referenceable.set("comment", comment);
return referenceable;
}
Id table(String name, String description, Id dbId, String owner, String tableType, List<Referenceable> columns,
String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
referenceable.set("description", description);
referenceable.set("owner", owner);
referenceable.set("tableType", tableType);
referenceable.set("createTime", System.currentTimeMillis());
referenceable.set("lastAccessTime", System.currentTimeMillis());
referenceable.set("retention", System.currentTimeMillis());
referenceable.set("db", dbId);
referenceable.set("columns", columns);
return createInstance(referenceable);
}
Id loadProcess(String name, String user, List<Id> inputTables, List<Id> outputTables, String queryText,
String queryPlan, String queryId, String queryGraph, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
referenceable.set("userName", user);
referenceable.set("startTime", System.currentTimeMillis());
referenceable.set("endTime", System.currentTimeMillis() + 10000);
referenceable.set("inputs", inputTables);
referenceable.set("outputs", outputTables);
referenceable.set("operationType", "testOperation");
referenceable.set("queryText", queryText);
referenceable.set("queryPlan", queryPlan);
referenceable.set("queryId", queryId);
referenceable.set("queryGraph", queryGraph);
return createInstance(referenceable);
}
}