blob: 0ca0ba74813e003c68987966727616d52e34cfe4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.examples;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.model.SearchFilter;
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.AtlasSearchResult.AtlasFullTextResult;
import org.apache.atlas.model.discovery.AtlasSearchResult.AttributeSearchResult;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasEnumDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.ArrayUtils;
import javax.ws.rs.core.MultivaluedMap;
import java.util.*;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_PARAM_ATTRIBUTE;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_TYPE_INVERSE_REF;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_TYPE_OWNED_REF;
/**
* A driver that sets up sample types and entities using v2 types and entity model for testing purposes.
*/
public class QuickStartV2 {
public static final String ATLAS_REST_ADDRESS = "atlas.rest.address";
public static final String SALES_DB = "Sales";
public static final String REPORTING_DB = "Reporting";
public static final String LOGGING_DB = "Logging";
public static final String SALES_FACT_TABLE = "sales_fact";
public static final String PRODUCT_DIM_TABLE = "product_dim";
public static final String CUSTOMER_DIM_TABLE = "customer_dim";
public static final String TIME_DIM_TABLE = "time_dim";
public static final String SALES_FACT_DAILY_MV_TABLE = "sales_fact_daily_mv";
public static final String SALES_FACT_MONTHLY_MV_TABLE = "sales_fact_monthly_mv";
public static final String LOG_FACT_DAILY_MV_TABLE = "log_fact_daily_mv";
public static final String LOG_FACT_MONTHLY_MV_TABLE = "logging_fact_monthly_mv";
public static final String TIME_ID_COLUMN = "time_id";
public static final String PRODUCT_ID_COLUMN = "product_id";
public static final String CUSTOMER_ID_COLUMN = "customer_id";
public static final String APP_ID_COLUMN = "app_id";
public static final String MACHINE_ID_COLUMN = "machine_id";
public static final String PRODUCT_NAME_COLUMN = "product_name";
public static final String BRAND_NAME_COLUMN = "brand_name";
public static final String NAME_COLUMN = "name";
public static final String SALES_COLUMN = "sales";
public static final String LOG_COLUMN = "log";
public static final String ADDRESS_COLUMN = "address";
public static final String DAY_OF_YEAR_COLUMN = "dayOfYear";
public static final String WEEKDAY_COLUMN = "weekDay";
public static final String DIMENSION_CLASSIFICATION = "Dimension";
public static final String FACT_CLASSIFICATION = "Fact";
public static final String PII_CLASSIFICATION = "PII";
public static final String METRIC_CLASSIFICATION = "Metric";
public static final String ETL_CLASSIFICATION = "ETL";
public static final String JDBC_CLASSIFICATION = "JdbcAccess";
public static final String LOGDATA_CLASSIFICATION = "Log Data";
public static final String LOAD_SALES_DAILY_PROCESS = "loadSalesDaily";
public static final String LOAD_SALES_MONTHLY_PROCESS = "loadSalesMonthly";
public static final String LOAD_LOGS_MONTHLY_PROCESS = "loadLogsMonthly";
public static final String PRODUCT_DIM_VIEW = "product_dim_view";
public static final String CUSTOMER_DIM_VIEW = "customer_dim_view";
public static final String DATABASE_TYPE = "DB";
public static final String COLUMN_TYPE = "Column";
public static final String TABLE_TYPE = "Table";
public static final String VIEW_TYPE = "View";
public static final String LOAD_PROCESS_TYPE = "LoadProcess";
public static final String STORAGE_DESC_TYPE = "StorageDesc";
public static final String[] TYPES = { DATABASE_TYPE, TABLE_TYPE, STORAGE_DESC_TYPE, COLUMN_TYPE, LOAD_PROCESS_TYPE,
VIEW_TYPE, JDBC_CLASSIFICATION, ETL_CLASSIFICATION, METRIC_CLASSIFICATION,
PII_CLASSIFICATION, FACT_CLASSIFICATION, DIMENSION_CLASSIFICATION, LOGDATA_CLASSIFICATION };
public static void main(String[] args) throws Exception {
String[] basicAuthUsernamePassword = null;
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
}
runQuickstart(args, basicAuthUsernamePassword);
}
@VisibleForTesting
static void runQuickstart(String[] args, String[] basicAuthUsernamePassword) throws Exception {
String[] urls = getServerUrl(args);
QuickStartV2 quickStartV2;
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
quickStartV2 = new QuickStartV2(urls, basicAuthUsernamePassword);
} else {
quickStartV2 = new QuickStartV2(urls);
}
// Shows how to create v2 types in Atlas for your meta model
quickStartV2.createTypes();
// Shows how to create v2 entities (instances) for the added types in Atlas
quickStartV2.createEntities();
// Shows some search queries using DSL based on types
quickStartV2.search();
// Shows some lineage information on entity
quickStartV2.lineage();
}
static String[] getServerUrl(String[] args) throws AtlasException {
if (args.length > 0) {
return args[0].split(",");
}
Configuration configuration = ApplicationProperties.get();
String[] urls = configuration.getStringArray(ATLAS_REST_ADDRESS);
if (ArrayUtils.isEmpty(urls)) {
System.out.println("org.apache.atlas.examples.QuickStartV2 <Atlas REST address <http/https>://<atlas-fqdn>:<atlas-port> like http://localhost:21000>");
System.exit(-1);
}
return urls;
}
private final AtlasClientV2 atlasClientV2;
QuickStartV2(String[] urls, String[] basicAuthUsernamePassword) {
atlasClientV2 = new AtlasClientV2(urls,basicAuthUsernamePassword);
}
QuickStartV2(String[] urls) throws AtlasException {
atlasClientV2 = new AtlasClientV2(urls);
}
void createTypes() throws Exception {
AtlasTypesDef atlasTypesDef = createTypeDefinitions();
System.out.println("\nCreating sample types: ");
atlasClientV2.createAtlasTypeDefs(atlasTypesDef);
verifyTypesCreated();
}
AtlasTypesDef createTypeDefinitions() throws Exception {
AtlasEntityDef dbType = AtlasTypeUtil.createClassTypeDef(DATABASE_TYPE, DATABASE_TYPE, "1.0", null,
AtlasTypeUtil.createUniqueRequiredAttrDef("name", "string"),
AtlasTypeUtil.createOptionalAttrDef("description", "string"),
AtlasTypeUtil.createOptionalAttrDef("locationUri", "string"),
AtlasTypeUtil.createOptionalAttrDef("owner", "string"),
AtlasTypeUtil.createOptionalAttrDef("createTime", "long"));
AtlasEntityDef sdType = AtlasTypeUtil.createClassTypeDef(STORAGE_DESC_TYPE, STORAGE_DESC_TYPE, "1.0", null,
AtlasTypeUtil.createOptionalAttrDefWithConstraint("table", TABLE_TYPE, CONSTRAINT_TYPE_INVERSE_REF,
new HashMap<String, Object>() {{ put(CONSTRAINT_PARAM_ATTRIBUTE, "sd"); }}),
AtlasTypeUtil.createOptionalAttrDef("location", "string"),
AtlasTypeUtil.createOptionalAttrDef("inputFormat", "string"),
AtlasTypeUtil.createOptionalAttrDef("outputFormat", "string"),
AtlasTypeUtil.createRequiredAttrDef("compressed", "boolean"));
AtlasEntityDef colType = AtlasTypeUtil.createClassTypeDef(COLUMN_TYPE, COLUMN_TYPE, "1.0", null,
AtlasTypeUtil.createOptionalAttrDef("name", "string"),
AtlasTypeUtil.createOptionalAttrDef("dataType", "string"),
AtlasTypeUtil.createOptionalAttrDef("comment", "string"),
AtlasTypeUtil.createOptionalAttrDefWithConstraint("table", TABLE_TYPE, CONSTRAINT_TYPE_INVERSE_REF,
new HashMap<String, Object>() {{ put(CONSTRAINT_PARAM_ATTRIBUTE, "columns"); }}));
colType.setOptions(new HashMap<String, String>() {{ put("schemaAttributes", "[\"name\", \"description\", \"owner\", \"type\", \"comment\", \"position\"]"); }});
AtlasEntityDef tblType = AtlasTypeUtil.createClassTypeDef(TABLE_TYPE, TABLE_TYPE, "1.0", Collections.singleton("DataSet"),
AtlasTypeUtil.createRequiredAttrDef("db", DATABASE_TYPE),
AtlasTypeUtil.createRequiredAttrDefWithConstraint("sd", STORAGE_DESC_TYPE, CONSTRAINT_TYPE_OWNED_REF, null),
AtlasTypeUtil.createOptionalAttrDef("owner", "string"),
AtlasTypeUtil.createOptionalAttrDef("createTime", "long"),
AtlasTypeUtil.createOptionalAttrDef("lastAccessTime", "long"),
AtlasTypeUtil.createOptionalAttrDef("retention", "long"),
AtlasTypeUtil.createOptionalAttrDef("viewOriginalText", "string"),
AtlasTypeUtil.createOptionalAttrDef("viewExpandedText", "string"),
AtlasTypeUtil.createOptionalAttrDef("tableType", "string"),
AtlasTypeUtil.createOptionalAttrDef("temporary", "boolean"),
AtlasTypeUtil.createRequiredListAttrDefWithConstraint("columns", AtlasBaseTypeDef.getArrayTypeName(COLUMN_TYPE),
CONSTRAINT_TYPE_OWNED_REF, null));
tblType.setOptions(new HashMap<String, String>() {{ put("schemaElementsAttribute", "columns"); }});
AtlasEntityDef procType = AtlasTypeUtil.createClassTypeDef(LOAD_PROCESS_TYPE, LOAD_PROCESS_TYPE, "1.0", Collections.singleton("Process"),
AtlasTypeUtil.createOptionalAttrDef("userName", "string"),
AtlasTypeUtil.createOptionalAttrDef("startTime", "long"),
AtlasTypeUtil.createOptionalAttrDef("endTime", "long"),
AtlasTypeUtil.createRequiredAttrDef("queryText", "string"),
AtlasTypeUtil.createRequiredAttrDef("queryPlan", "string"),
AtlasTypeUtil.createRequiredAttrDef("queryId", "string"),
AtlasTypeUtil.createRequiredAttrDef("queryGraph", "string"));
AtlasEntityDef viewType = AtlasTypeUtil.createClassTypeDef(VIEW_TYPE, VIEW_TYPE, "1.0", Collections.singleton("DataSet"),
AtlasTypeUtil.createRequiredAttrDef("db", DATABASE_TYPE),
AtlasTypeUtil.createOptionalListAttrDef("inputTables", AtlasBaseTypeDef.getArrayTypeName(TABLE_TYPE)));
AtlasClassificationDef dimClassifDef = AtlasTypeUtil.createTraitTypeDef(DIMENSION_CLASSIFICATION, "Dimension Classification", "1.0", Collections.<String>emptySet());
AtlasClassificationDef factClassifDef = AtlasTypeUtil.createTraitTypeDef(FACT_CLASSIFICATION, "Fact Classification", "1.0", Collections.<String>emptySet());
AtlasClassificationDef piiClassifDef = AtlasTypeUtil.createTraitTypeDef(PII_CLASSIFICATION, "PII Classification", "1.0", Collections.<String>emptySet());
AtlasClassificationDef metricClassifDef = AtlasTypeUtil.createTraitTypeDef(METRIC_CLASSIFICATION, "Metric Classification", "1.0", Collections.<String>emptySet());
AtlasClassificationDef etlClassifDef = AtlasTypeUtil.createTraitTypeDef(ETL_CLASSIFICATION, "ETL Classification", "1.0", Collections.<String>emptySet());
AtlasClassificationDef jdbcClassifDef = AtlasTypeUtil.createTraitTypeDef(JDBC_CLASSIFICATION, "JdbcAccess Classification", "1.0", Collections.<String>emptySet());
AtlasClassificationDef logClassifDef = AtlasTypeUtil.createTraitTypeDef(LOGDATA_CLASSIFICATION, "LogData Classification", "1.0", Collections.<String>emptySet());
return AtlasTypeUtil.getTypesDef(Collections.<AtlasEnumDef>emptyList(),
Collections.<AtlasStructDef>emptyList(),
Arrays.asList(dimClassifDef, factClassifDef, piiClassifDef, metricClassifDef, etlClassifDef, jdbcClassifDef, logClassifDef),
Arrays.asList(dbType, sdType, colType, tblType, procType, viewType));
}
void createEntities() throws Exception {
System.out.println("\nCreating sample entities: ");
// Database entities
AtlasEntity salesDB = createDatabase(SALES_DB, "sales database", "John ETL", "hdfs://host:8000/apps/warehouse/sales");
AtlasEntity reportingDB = createDatabase(REPORTING_DB, "reporting database", "Jane BI", "hdfs://host:8000/apps/warehouse/reporting");
AtlasEntity logDB = createDatabase(LOGGING_DB, "logging database", "Tim ETL", "hdfs://host:8000/apps/warehouse/logging");
// Storage Descriptor entities
AtlasEntity storageDesc = createStorageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true);
// Column entities
List<AtlasEntity> salesFactColumns = Arrays.asList(createColumn(TIME_ID_COLUMN, "int", "time id"),
createColumn(PRODUCT_ID_COLUMN, "int", "product id"),
createColumn(CUSTOMER_ID_COLUMN, "int", "customer id", PII_CLASSIFICATION),
createColumn(SALES_COLUMN, "double", "product id", METRIC_CLASSIFICATION));
List<AtlasEntity> logFactColumns = Arrays.asList(createColumn(TIME_ID_COLUMN, "int", "time id"),
createColumn(APP_ID_COLUMN, "int", "app id"),
createColumn(MACHINE_ID_COLUMN, "int", "machine id"),
createColumn(LOG_COLUMN, "string", "log data", LOGDATA_CLASSIFICATION));
List<AtlasEntity> productDimColumns = Arrays.asList(createColumn(PRODUCT_ID_COLUMN, "int", "product id"),
createColumn(PRODUCT_NAME_COLUMN, "string", "product name"),
createColumn(BRAND_NAME_COLUMN, "int", "brand name"));
List<AtlasEntity> timeDimColumns = Arrays.asList(createColumn(TIME_ID_COLUMN, "int", "time id"),
createColumn(DAY_OF_YEAR_COLUMN, "int", "day Of Year"),
createColumn(WEEKDAY_COLUMN, "int", "week Day"));
List<AtlasEntity> customerDimColumns = Arrays.asList(createColumn(CUSTOMER_ID_COLUMN, "int", "customer id", PII_CLASSIFICATION),
createColumn(NAME_COLUMN, "string", "customer name", PII_CLASSIFICATION),
createColumn(ADDRESS_COLUMN, "string", "customer address", PII_CLASSIFICATION));
// Table entities
AtlasEntity salesFact = createTable(SALES_FACT_TABLE, "sales fact table", salesDB, storageDesc,
"Joe", "Managed", salesFactColumns, FACT_CLASSIFICATION);
AtlasEntity productDim = createTable(PRODUCT_DIM_TABLE, "product dimension table", salesDB, storageDesc,
"John Doe", "Managed", productDimColumns, DIMENSION_CLASSIFICATION);
AtlasEntity customerDim = createTable(CUSTOMER_DIM_TABLE, "customer dimension table", salesDB, storageDesc,
"fetl", "External", customerDimColumns, DIMENSION_CLASSIFICATION);
AtlasEntity timeDim = createTable(TIME_DIM_TABLE, "time dimension table", salesDB, storageDesc,
"John Doe", "External", timeDimColumns, DIMENSION_CLASSIFICATION);
AtlasEntity loggingFactDaily = createTable(LOG_FACT_DAILY_MV_TABLE, "log fact daily materialized view", logDB,
storageDesc, "Tim ETL", "Managed", logFactColumns, LOGDATA_CLASSIFICATION);
AtlasEntity loggingFactMonthly = createTable(LOG_FACT_MONTHLY_MV_TABLE, "logging fact monthly materialized view", logDB,
storageDesc, "Tim ETL", "Managed", logFactColumns, LOGDATA_CLASSIFICATION);
AtlasEntity salesFactDaily = createTable(SALES_FACT_DAILY_MV_TABLE, "sales fact daily materialized view", reportingDB,
storageDesc, "Joe BI", "Managed", salesFactColumns, METRIC_CLASSIFICATION);
AtlasEntity salesFactMonthly = createTable(SALES_FACT_MONTHLY_MV_TABLE, "sales fact monthly materialized view", reportingDB,
storageDesc, "Jane BI", "Managed", salesFactColumns, METRIC_CLASSIFICATION);
// View entities
createView(PRODUCT_DIM_VIEW, reportingDB, Collections.singletonList(productDim), DIMENSION_CLASSIFICATION, JDBC_CLASSIFICATION);
createView(CUSTOMER_DIM_VIEW, reportingDB, Collections.singletonList(customerDim), DIMENSION_CLASSIFICATION, JDBC_CLASSIFICATION);
// Process entities
createProcess(LOAD_SALES_DAILY_PROCESS, "hive query for daily summary", "John ETL",
Arrays.asList(salesFact, timeDim),
Collections.singletonList(salesFactDaily),
"create table as select ", "plan", "id", "graph", ETL_CLASSIFICATION);
createProcess(LOAD_SALES_MONTHLY_PROCESS, "hive query for monthly summary", "John ETL",
Collections.singletonList(salesFactDaily),
Collections.singletonList(salesFactMonthly),
"create table as select ", "plan", "id", "graph", ETL_CLASSIFICATION);
createProcess(LOAD_LOGS_MONTHLY_PROCESS, "hive query for monthly summary", "Tim ETL",
Collections.singletonList(loggingFactDaily),
Collections.singletonList(loggingFactMonthly),
"create table as select ", "plan", "id", "graph", ETL_CLASSIFICATION);
}
private AtlasEntity createInstance(AtlasEntity entity, String[] traitNames) throws Exception {
AtlasEntity ret = null;
EntityMutationResponse response = atlasClientV2.createEntity(new AtlasEntityWithExtInfo(entity));
List<AtlasEntityHeader> entities = response.getEntitiesByOperation(EntityOperation.CREATE);
if (CollectionUtils.isNotEmpty(entities)) {
AtlasEntityWithExtInfo getByGuidResponse = atlasClientV2.getEntityByGuid(entities.get(0).getGuid());
ret = getByGuidResponse.getEntity();
System.out.println("Created entity of type [" + ret.getTypeName() + "], guid: " + ret.getGuid());
}
return ret;
}
AtlasEntity createDatabase(String name, String description, String owner, String locationUri, String... traitNames)
throws Exception {
AtlasEntity entity = new AtlasEntity(DATABASE_TYPE);
entity.setClassifications(toAtlasClassifications(traitNames));
entity.setAttribute("name", name);
entity.setAttribute("description", description);
entity.setAttribute("owner", owner);
entity.setAttribute("locationuri", locationUri);
entity.setAttribute("createTime", System.currentTimeMillis());
return createInstance(entity, traitNames);
}
private List<AtlasClassification> toAtlasClassifications(String[] traitNames) {
List<AtlasClassification> ret = new ArrayList<>();
List<String> traits = Arrays.asList(traitNames);
if (CollectionUtils.isNotEmpty(traits)) {
for (String trait : traits) {
ret.add(new AtlasClassification(trait));
}
}
return ret;
}
AtlasEntity createStorageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed)
throws Exception {
AtlasEntity entity = new AtlasEntity(STORAGE_DESC_TYPE);
entity.setAttribute("location", location);
entity.setAttribute("inputFormat", inputFormat);
entity.setAttribute("outputFormat", outputFormat);
entity.setAttribute("compressed", compressed);
return createInstance(entity, null);
}
AtlasEntity createColumn(String name, String dataType, String comment, String... traitNames) throws Exception {
AtlasEntity entity = new AtlasEntity(COLUMN_TYPE);
entity.setClassifications(toAtlasClassifications(traitNames));
entity.setAttribute("name", name);
entity.setAttribute("dataType", dataType);
entity.setAttribute("comment", comment);
return createInstance(entity, traitNames);
}
AtlasEntity createTable(String name, String description, AtlasEntity db, AtlasEntity sd, String owner, String tableType,
List<AtlasEntity> columns, String... traitNames) throws Exception {
AtlasEntity entity = new AtlasEntity(TABLE_TYPE);
entity.setClassifications(toAtlasClassifications(traitNames));
entity.setAttribute("name", name);
entity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
entity.setAttribute("description", description);
entity.setAttribute("owner", owner);
entity.setAttribute("tableType", tableType);
entity.setAttribute("createTime", System.currentTimeMillis());
entity.setAttribute("lastAccessTime", System.currentTimeMillis());
entity.setAttribute("retention", System.currentTimeMillis());
entity.setAttribute("db", AtlasTypeUtil.getAtlasObjectId(db));
entity.setAttribute("sd", AtlasTypeUtil.getAtlasObjectId(sd));
entity.setAttribute("columns", AtlasTypeUtil.toObjectIds(columns));
return createInstance(entity, traitNames);
}
AtlasEntity createProcess(String name, String description, String user, List<AtlasEntity> inputs, List<AtlasEntity> outputs,
String queryText, String queryPlan, String queryId, String queryGraph, String... traitNames) throws Exception {
AtlasEntity entity = new AtlasEntity(LOAD_PROCESS_TYPE);
entity.setClassifications(toAtlasClassifications(traitNames));
entity.setAttribute(AtlasClient.NAME, name);
entity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
entity.setAttribute("description", description);
entity.setAttribute("inputs", inputs);
entity.setAttribute("outputs", outputs);
entity.setAttribute("user", user);
entity.setAttribute("startTime", System.currentTimeMillis());
entity.setAttribute("endTime", System.currentTimeMillis() + 10000);
entity.setAttribute("queryText", queryText);
entity.setAttribute("queryPlan", queryPlan);
entity.setAttribute("queryId", queryId);
entity.setAttribute("queryGraph", queryGraph);
return createInstance(entity, traitNames);
}
AtlasEntity createView(String name, AtlasEntity db, List<AtlasEntity> inputTables, String... traitNames) throws Exception {
AtlasEntity entity = new AtlasEntity(VIEW_TYPE);
entity.setClassifications(toAtlasClassifications(traitNames));
entity.setAttribute("name", name);
entity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
entity.setAttribute("db", db);
entity.setAttribute("inputTables", inputTables);
return createInstance(entity, traitNames);
}
private void verifyTypesCreated() throws Exception {
MultivaluedMap<String, String> searchParams = new MultivaluedMapImpl();
for (String typeName : TYPES) {
searchParams.clear();
searchParams.add(SearchFilter.PARAM_NAME, typeName);
SearchFilter searchFilter = new SearchFilter(searchParams);
AtlasTypesDef searchDefs = atlasClientV2.getAllTypeDefs(searchFilter);
assert (!searchDefs.isEmpty());
System.out.println("Created type [" + typeName + "]");
}
}
private String[] getDSLQueries() {
return new String[]{
"from DB",
"DB",
"DB where name=\"Reporting\"",
"DB where DB.name=\"Reporting\"",
"DB name = \"Reporting\"",
"DB DB.name = \"Reporting\"",
"DB where name=\"Reporting\" select name, owner",
"DB where DB.name=\"Reporting\" select name, owner",
"DB has name",
"DB where DB has name",
//--TODO: Fix "DB, Table", // Table, db; Table db works
"DB is JdbcAccess",
"from Table",
"Table",
"Table is Dimension",
"Column where Column isa PII",
"View is Dimension",
"Column select Column.name",
"Column select name",
"Column where Column.name=\"customer_id\"",
"from Table select Table.name",
"DB where (name = \"Reporting\")",
//--TODO: Fix "DB where (name = \"Reporting\") select name as _col_0, owner as _col_1",
"DB where DB is JdbcAccess",
"DB where DB has name",
//--TODO: Fix "DB Table",
"DB as db1 Table where (db1.name = \"Reporting\")",
//--TODO: Fix "DB where (name = \"Reporting\") select name as _col_0, (createTime + 1) as _col_1 ", // N
DIMENSION_CLASSIFICATION,
JDBC_CLASSIFICATION,
ETL_CLASSIFICATION,
METRIC_CLASSIFICATION,
PII_CLASSIFICATION,
"`Log Data`",
"Table where name=\"sales_fact\", columns",
"Table where name=\"sales_fact\", columns as column select column.name, column.dataType, column.comment",
"from DataSet",
"from Process" };
}
private void search() throws Exception {
System.out.println("\nSample DSL Queries: ");
for (String dslQuery : getDSLQueries()) {
try {
AtlasSearchResult results = atlasClientV2.dslSearchWithParams(dslQuery, 10, 0);
if (results != null) {
List<AtlasEntityHeader> entitiesResult = results.getEntities();
List<AtlasFullTextResult> fullTextResults = results.getFullTextResult();
AttributeSearchResult attribResult = results.getAttributes();
if (CollectionUtils.isNotEmpty(entitiesResult)) {
System.out.println("query [" + dslQuery + "] returned [" + entitiesResult.size() + "] rows.");
} else if (CollectionUtils.isNotEmpty(fullTextResults)) {
System.out.println("query [" + dslQuery + "] returned [" + fullTextResults.size() + "] rows.");
} else if (attribResult != null) {
System.out.println("query [" + dslQuery + "] returned [" + attribResult.getValues().size() + "] rows.");
}
} else {
System.out.println("query [" + dslQuery + "] failed, results:" + results);
}
} catch (Exception e) {
System.out.println("query [" + dslQuery + "] execution failed!");
}
}
}
private void lineage() throws AtlasServiceException {
System.out.println("\nSample Lineage Info: ");
AtlasLineageInfo lineageInfo = atlasClientV2.getLineageInfo(getTableId(SALES_FACT_DAILY_MV_TABLE), LineageDirection.BOTH, 0);
Set<LineageRelation> relations = lineageInfo.getRelations();
Map<String, AtlasEntityHeader> guidEntityMap = lineageInfo.getGuidEntityMap();
for (LineageRelation relation : relations) {
AtlasEntityHeader fromEntity = guidEntityMap.get(relation.getFromEntityId());
AtlasEntityHeader toEntity = guidEntityMap.get(relation.getToEntityId());
System.out.println(fromEntity.getDisplayText() + "(" + fromEntity.getTypeName() + ") -> " +
toEntity.getDisplayText() + "(" + toEntity.getTypeName() + ")");
}
}
private String getTableId(String tableName) throws AtlasServiceException {
Map<String, String> attributes = new HashMap<>();
attributes.put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableName);
AtlasEntity tableEntity = atlasClientV2.getEntityByAttribute(TABLE_TYPE, attributes).getEntity();
return tableEntity.getGuid();
}
}