blob: 7c6b60b772115b0d936dcd8b2cacb7bfd1f5f24e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.services.MetadataService;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.typesystem.types.*;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.testng.annotations.Guice;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_INPUTS;
import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS;
/**
* Base Class to set up hive types and instances for tests
*/
@Guice(modules = TestModules.TestOnlyModule.class)
public class BaseRepositoryTest {
@Inject
protected MetadataService metadataService;
@Inject
protected MetadataRepository repository;
protected void setUp() throws Exception {
//force graph initialization / built in type registration
TestUtils.getGraph();
setUpDefaultTypes();
setUpTypes();
TestUtils.getGraph().commit();
new GraphBackedSearchIndexer(new AtlasTypeRegistry());
TestUtils.resetRequestContext();
setupInstances();
TestUtils.getGraph().commit();
TestUtils.dumpGraph(TestUtils.getGraph());
}
protected void tearDown() throws Exception {
TypeSystem.getInstance().reset();
}
private void setUpTypes() throws Exception {
TypesDef typesDef = createTypeDefinitions();
String typesAsJSON = TypesSerialization.toJson(typesDef);
metadataService.createType(typesAsJSON);
}
protected static final String DATABASE_TYPE = "hive_db";
protected static final String HIVE_TABLE_TYPE = "hive_table";
private static final String COLUMN_TYPE = "hive_column";
private static final String HIVE_PROCESS_TYPE = "hive_process";
private static final String STORAGE_DESC_TYPE = "StorageDesc";
private static final String VIEW_TYPE = "View";
private static final String PARTITION_TYPE = "hive_partition";
protected static final String DATASET_SUBTYPE = "dataset_subtype";
TypesDef createTypeDefinitions() {
HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil
.createClassTypeDef(DATABASE_TYPE, null,
TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE),
attrDef("description", DataTypes.STRING_TYPE), attrDef("locationUri", DataTypes.STRING_TYPE),
attrDef("owner", DataTypes.STRING_TYPE), attrDef("createTime", DataTypes.LONG_TYPE));
HierarchicalTypeDefinition<ClassType> columnClsDef = TypesUtil
.createClassTypeDef(COLUMN_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
attrDef("dataType", DataTypes.STRING_TYPE), attrDef("comment", DataTypes.STRING_TYPE));
HierarchicalTypeDefinition<ClassType> storageDescClsDef = TypesUtil
.createClassTypeDef(STORAGE_DESC_TYPE, null,
attrDef("location", DataTypes.STRING_TYPE),
attrDef("inputFormat", DataTypes.STRING_TYPE), attrDef("outputFormat", DataTypes.STRING_TYPE),
attrDef("compressed", DataTypes.STRING_TYPE, Multiplicity.REQUIRED, false, null));
HierarchicalTypeDefinition<ClassType> tblClsDef = TypesUtil
.createClassTypeDef(HIVE_TABLE_TYPE, ImmutableSet.of("DataSet"),
attrDef("owner", DataTypes.STRING_TYPE),
attrDef("createTime", DataTypes.DATE_TYPE),
attrDef("lastAccessTime", DataTypes.LONG_TYPE), attrDef("tableType", DataTypes.STRING_TYPE),
attrDef("temporary", DataTypes.BOOLEAN_TYPE),
new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
// todo - uncomment this, something is broken
new AttributeDefinition("sd", STORAGE_DESC_TYPE, Multiplicity.REQUIRED, true, null),
new AttributeDefinition("columns", DataTypes.arrayTypeName(COLUMN_TYPE),
Multiplicity.COLLECTION, true, null));
HierarchicalTypeDefinition<ClassType> loadProcessClsDef = TypesUtil
.createClassTypeDef(HIVE_PROCESS_TYPE, ImmutableSet.of("Process"),
attrDef("userName", DataTypes.STRING_TYPE), attrDef("startTime", DataTypes.LONG_TYPE),
attrDef("endTime", DataTypes.LONG_TYPE),
attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryGraph", DataTypes.STRING_TYPE, Multiplicity.REQUIRED));
HierarchicalTypeDefinition<ClassType> viewClsDef = TypesUtil
.createClassTypeDef(VIEW_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
new AttributeDefinition("inputTables", DataTypes.arrayTypeName(HIVE_TABLE_TYPE),
Multiplicity.COLLECTION, false, null));
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("values", DataTypes.arrayTypeName(DataTypes.STRING_TYPE.getName()),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("table", HIVE_TABLE_TYPE, Multiplicity.REQUIRED, false, null),
};
HierarchicalTypeDefinition<ClassType> partClsDef =
new HierarchicalTypeDefinition<>(ClassType.class, PARTITION_TYPE, null, null,
attributeDefinitions);
HierarchicalTypeDefinition<ClassType> datasetSubTypeClsDef = TypesUtil
.createClassTypeDef(DATASET_SUBTYPE, ImmutableSet.of("DataSet"));
HierarchicalTypeDefinition < TraitType > dimTraitDef = TypesUtil.createTraitTypeDef("Dimension", null);
HierarchicalTypeDefinition<TraitType> factTraitDef = TypesUtil.createTraitTypeDef("Fact", null);
HierarchicalTypeDefinition<TraitType> metricTraitDef = TypesUtil.createTraitTypeDef("Metric", null);
HierarchicalTypeDefinition<TraitType> etlTraitDef = TypesUtil.createTraitTypeDef("ETL", null);
HierarchicalTypeDefinition<TraitType> piiTraitDef = TypesUtil.createTraitTypeDef("PII", null);
HierarchicalTypeDefinition<TraitType> jdbcTraitDef = TypesUtil.createTraitTypeDef("JdbcAccess", null);
HierarchicalTypeDefinition<TraitType> logTraitDef = TypesUtil.createTraitTypeDef("Log Data", null);
HierarchicalTypeDefinition<TraitType> isaKeywordTraitDef = TypesUtil.createTraitTypeDef("isa", null);
return TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
ImmutableList.of(dimTraitDef, factTraitDef, piiTraitDef, metricTraitDef, etlTraitDef, jdbcTraitDef, logTraitDef,
isaKeywordTraitDef),
ImmutableList.of(dbClsDef, storageDescClsDef, columnClsDef, tblClsDef, loadProcessClsDef, viewClsDef, partClsDef, datasetSubTypeClsDef));
}
AttributeDefinition attrDef(String name, IDataType dT) {
return attrDef(name, dT, Multiplicity.OPTIONAL, false, null);
}
AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m) {
return attrDef(name, dT, m, false, null);
}
AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m, boolean isComposite,
String reverseAttributeName) {
Preconditions.checkNotNull(name);
Preconditions.checkNotNull(dT);
return new AttributeDefinition(name, dT.getName(), m, isComposite, reverseAttributeName);
}
private void setupInstances() throws Exception {
Id salesDB = database("Sales", "Sales Database", "John ETL", "hdfs://host:8000/apps/warehouse/sales");
Referenceable sd =
storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true, ImmutableList.of(
column("time_id", "int", "time id")));
List<Referenceable> salesFactColumns = ImmutableList
.of(column("time_id", "int", "time id"),
column("product_id", "int", "product id"),
column("customer_id", "int", "customer id", "PII"),
column("sales", "double", "product id", "Metric"));
Id salesFact = table("sales_fact", "sales fact table", salesDB, sd, "Joe", "Managed", salesFactColumns, "Fact");
List<Referenceable> logFactColumns = ImmutableList
.of(column("time_id", "int", "time id"), column("app_id", "int", "app id"),
column("machine_id", "int", "machine id"), column("log", "string", "log data", "Log Data"));
List<Referenceable> timeDimColumns = ImmutableList
.of(column("time_id", "int", "time id"),
column("dayOfYear", "int", "day Of Year"),
column("weekDay", "int", "week Day"));
Id timeDim = table("time_dim", "time dimension table", salesDB, sd, "John Doe", "External", timeDimColumns,
"Dimension");
Id reportingDB =
database("Reporting", "reporting database", "Jane BI", "hdfs://host:8000/apps/warehouse/reporting");
Id salesFactDaily =
table("sales_fact_daily_mv", "sales fact daily materialized view", reportingDB, sd, "Joe BI", "Managed",
salesFactColumns, "Metric");
Id circularLineageTable1 = table("table1", "", reportingDB, sd, "Vimal", "Managed", salesFactColumns, "Metric");
Id circularLineageTable2 = table("table2", "", reportingDB, sd, "Vimal", "Managed", salesFactColumns, "Metric");
loadProcess("circularLineage1", "hive query for daily summary", "John ETL", ImmutableList.of(circularLineageTable1),
ImmutableList.of(circularLineageTable2), "create table as select ", "plan", "id", "graph", "ETL");
loadProcess("circularLineage2", "hive query for daily summary", "John ETL", ImmutableList.of(circularLineageTable2),
ImmutableList.of(circularLineageTable1), "create table as select ", "plan", "id", "graph", "ETL");
loadProcess("loadSalesDaily", "hive query for daily summary", "John ETL", ImmutableList.of(salesFact, timeDim),
ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL");
Id logDB = database("Logging", "logging database", "Tim ETL", "hdfs://host:8000/apps/warehouse/logging");
Id loggingFactDaily =
table("log_fact_daily_mv", "log fact daily materialized view", logDB, sd, "Tim ETL", "Managed",
logFactColumns, "Log Data");
List<Referenceable> productDimColumns = ImmutableList
.of(column("product_id", "int", "product id"),
column("product_name", "string", "product name"),
column("brand_name", "int", "brand name"));
Id productDim =
table("product_dim", "product dimension table", salesDB, sd, "John Doe", "Managed", productDimColumns,
"Dimension");
view("product_dim_view", reportingDB, ImmutableList.of(productDim), "Dimension", "JdbcAccess");
List<Referenceable> customerDimColumns = ImmutableList.of(
column("customer_id", "int", "customer id", "PII"),
column("name", "string", "customer name", "PII"),
column("address", "string", "customer address", "PII"));
Id customerDim =
table("customer_dim", "customer dimension table", salesDB, sd, "fetl", "External", customerDimColumns,
"Dimension");
view("customer_dim_view", reportingDB, ImmutableList.of(customerDim), "Dimension", "JdbcAccess");
Id salesFactMonthly =
table("sales_fact_monthly_mv", "sales fact monthly materialized view", reportingDB, sd, "Jane BI",
"Managed", salesFactColumns, "Metric");
loadProcess("loadSalesMonthly", "hive query for monthly summary", "John ETL", ImmutableList.of(salesFactDaily),
ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph", "ETL");
Id loggingFactMonthly =
table("logging_fact_monthly_mv", "logging fact monthly materialized view", logDB, sd, "Tim ETL",
"Managed", logFactColumns, "Log Data");
loadProcess("loadLogsMonthly", "hive query for monthly summary", "Tim ETL", ImmutableList.of(loggingFactDaily),
ImmutableList.of(loggingFactMonthly), "create table as select ", "plan", "id", "graph", "ETL");
partition(new ArrayList() {{ add("2015-01-01"); }}, salesFactDaily);
datasetSubType("dataSetSubTypeInst1", "testOwner");
}
Id database(String name, String description, String owner, String locationUri, String... traitNames)
throws Exception {
Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("description", description);
referenceable.set("owner", owner);
referenceable.set("locationUri", locationUri);
referenceable.set("createTime", System.currentTimeMillis());
ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, DATABASE_TYPE);
return createInstance(referenceable, clsType);
}
protected Referenceable storageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed, List<Referenceable> columns)
throws Exception {
Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE);
referenceable.set("location", location);
referenceable.set("inputFormat", inputFormat);
referenceable.set("outputFormat", outputFormat);
referenceable.set("compressed", compressed);
referenceable.set("cols", columns);
return referenceable;
}
protected Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("dataType", dataType);
referenceable.set("comment", comment);
return referenceable;
}
protected Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType,
List<Referenceable> columns, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, "qualified:" + name);
referenceable.set("description", description);
referenceable.set("owner", owner);
referenceable.set("tableType", tableType);
referenceable.set("temporary", false);
referenceable.set("createTime", new Date(System.currentTimeMillis()));
referenceable.set("lastAccessTime", System.currentTimeMillis());
referenceable.set("retention", System.currentTimeMillis());
referenceable.set("db", dbId);
// todo - uncomment this, something is broken
referenceable.set("sd", sd);
referenceable.set("columns", columns);
ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, HIVE_TABLE_TYPE);
return createInstance(referenceable, clsType);
}
protected Id loadProcess(String name, String description, String user, List<Id> inputTables, List<Id> outputTables,
String queryText, String queryPlan, String queryId, String queryGraph, String... traitNames)
throws Exception {
Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
referenceable.set("description", description);
referenceable.set("user", user);
referenceable.set("startTime", System.currentTimeMillis());
referenceable.set("endTime", System.currentTimeMillis() + 10000);
referenceable.set("inputs", inputTables);
referenceable.set("outputs", outputTables);
referenceable.set("queryText", queryText);
referenceable.set("queryPlan", queryPlan);
referenceable.set("queryId", queryId);
referenceable.set("queryGraph", queryGraph);
ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, HIVE_PROCESS_TYPE);
return createInstance(referenceable, clsType);
}
Id view(String name, Id dbId, List<Id> inputTables, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
referenceable.set("db", dbId);
referenceable.set("inputTables", inputTables);
ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, VIEW_TYPE);
return createInstance(referenceable, clsType);
}
Id partition(List<String> values, Id table, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(PARTITION_TYPE, traitNames);
referenceable.set("values", values);
referenceable.set("table", table);
ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, PARTITION_TYPE);
return createInstance(referenceable, clsType);
}
Id datasetSubType(final String name, String owner) throws Exception {
Referenceable referenceable = new Referenceable(DATASET_SUBTYPE);
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
referenceable.set(AtlasClient.NAME, name);
referenceable.set("owner", owner);
ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, DATASET_SUBTYPE);
return createInstance(referenceable, clsType);
}
private Id createInstance(Referenceable referenceable, ClassType clsType) throws Exception {
ITypedReferenceableInstance typedInstance = clsType.convert(referenceable, Multiplicity.REQUIRED);
List<String> guids = repository.createEntities(typedInstance).getCreatedEntities();
// return the reference to created instance with guid
return new Id(guids.get(guids.size() - 1), 0, referenceable.getTypeName());
}
private void setUpDefaultTypes() throws Exception {
TypesDef typesDef = createDefaultTypeDefinitions();
String typesAsJSON = TypesSerialization.toJson(typesDef);
metadataService.createType(typesAsJSON);
}
TypesDef createDefaultTypeDefinitions() {
HierarchicalTypeDefinition<ClassType> referenceableType = TypesUtil
.createClassTypeDef(AtlasClient.REFERENCEABLE_SUPER_TYPE, ImmutableSet.<String>of(),
new AttributeDefinition(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, true, true, null));
HierarchicalTypeDefinition<ClassType> assetType = TypesUtil
.createClassTypeDef(AtlasClient.ASSET_TYPE, ImmutableSet.<String>of(),
new AttributeDefinition(AtlasClient.NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, false, true, null),
TypesUtil.createOptionalAttrDef(AtlasClient.DESCRIPTION, DataTypes.STRING_TYPE),
new AttributeDefinition(AtlasClient.OWNER, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, false, true, null));
HierarchicalTypeDefinition<ClassType> infraType = TypesUtil
.createClassTypeDef(AtlasClient.INFRASTRUCTURE_SUPER_TYPE,
ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE, AtlasClient.ASSET_TYPE));
HierarchicalTypeDefinition<ClassType> datasetType = TypesUtil
.createClassTypeDef(AtlasClient.DATA_SET_SUPER_TYPE,
ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE, AtlasClient.ASSET_TYPE));
HierarchicalTypeDefinition<ClassType> processType = TypesUtil
.createClassTypeDef(AtlasClient.PROCESS_SUPER_TYPE,
ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE, AtlasClient.ASSET_TYPE),
new AttributeDefinition(PROCESS_ATTRIBUTE_INPUTS, DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition(PROCESS_ATTRIBUTE_OUTPUTS, DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE),
Multiplicity.OPTIONAL, false, null));
return TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
ImmutableList.<HierarchicalTypeDefinition<TraitType>>of(),
ImmutableList.of(referenceableType, assetType, infraType, datasetType, processType));
}
}