blob: 3d9d4845b09daeb9bb8e0bbe3b33ddb5a0ec0a45 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.examples.sampleapp;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations;
import org.apache.atlas.type.AtlasTypeUtil;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static java.util.Arrays.asList;
import static org.apache.atlas.examples.sampleapp.SampleAppConstants.*;
import static org.apache.atlas.type.AtlasTypeUtil.toAtlasRelatedObjectId;
import static org.apache.atlas.type.AtlasTypeUtil.toAtlasRelatedObjectIds;
public class EntityExample {
private static final String DATABASE_NAME = "employee_db_entity";
private static final String TABLE_NAME = "employee_table_entity";
private static final String PROCESS_NAME = "employee_process_entity";
private static final String METADATA_NAMESPACE_SUFFIX = "@cl1";
private static final String MANAGED_TABLE = "Managed";
private static final String ATTR_NAME = "name";
private static final String ATTR_DESCRIPTION = "description";
private static final String ATTR_QUALIFIED_NAME = "qualifiedName";
private static final String REFERENCEABLE_ATTRIBUTE_NAME = ATTR_QUALIFIED_NAME;
private static final String COLUMN_TIME_ID = "time_id";
private static final String COLUMN_CUSTOMER_ID = "customer_id";
private static final String COLUMN_COMPANY_ID = "company_id";
private final AtlasClientV2 client;
private AtlasEntity dbEntity;
private AtlasEntity tableEntityUS;
private AtlasEntity tableEntityCanada;
private AtlasEntityHeader loadProcess;
EntityExample(AtlasClientV2 client) {
this.client = client;
}
public void createEntities() throws Exception {
if (dbEntity == null) {
dbEntity = createDatabaseEntity(DATABASE_NAME);
SampleApp.log("Created entity: typeName=" + dbEntity.getTypeName() + ", qualifiedName=" + dbEntity.getAttribute(ATTR_QUALIFIED_NAME) + ", guid=" + dbEntity.getGuid());
}
if (tableEntityCanada == null) {
tableEntityCanada = createTableEntity(TABLE_NAME + "_CANADA");
SampleApp.log("Created entity: typeName=" + tableEntityCanada.getTypeName() + ", qualifiedName=" + tableEntityCanada.getAttribute(ATTR_QUALIFIED_NAME) + ", guid=" + tableEntityCanada.getGuid());
}
if (tableEntityUS == null) {
tableEntityUS = createTableEntity(TABLE_NAME + "_US");
SampleApp.log("Created entity: typeName=" + tableEntityUS.getTypeName() + ", qualifiedName=" + tableEntityUS.getAttribute(ATTR_QUALIFIED_NAME) + ", guid=" + tableEntityUS.getGuid());
}
if (loadProcess == null) {
loadProcess = createProcessEntity(PROCESS_NAME);
SampleApp.log("Created entity: typeName=" + loadProcess.getTypeName() + ", qualifiedName=" + loadProcess.getAttribute(ATTR_QUALIFIED_NAME) + ", guid=" + loadProcess.getGuid());
}
}
public AtlasEntity getTableEntity() {
return tableEntityUS;
}
public void getEntityByGuid(String entityGuid) throws Exception {
AtlasEntityWithExtInfo entity = client.getEntityByGuid(entityGuid);
if (entity != null) {
SampleApp.log("Retrieved entity with guid=" + entityGuid);
SampleApp.log(" " + entity);
}
}
public void deleteEntities() throws Exception {
client.deleteEntityByGuid(loadProcess.getGuid());
SampleApp.log("Deleted entity: guid=" + loadProcess.getGuid());
List<String> entityGuids = Arrays.asList(tableEntityUS.getGuid(), tableEntityCanada.getGuid(), dbEntity.getGuid());
client.deleteEntitiesByGuids(entityGuids);
SampleApp.log("Deleted entities:");
for (String entityGuid : entityGuids) {
SampleApp.log(" guid=" + entityGuid);
}
}
private AtlasEntity createTableEntity(String tableName) throws Exception {
return createHiveTable(dbEntity, tableName, MANAGED_TABLE,
Arrays.asList(createColumn(COLUMN_TIME_ID, "int", "time id"),
createColumn(COLUMN_CUSTOMER_ID, "int", "customer id", SampleAppConstants.PII_TAG),
createColumn(COLUMN_COMPANY_ID, "double", "company id", SampleAppConstants.FINANCE_TAG)),
SampleAppConstants.METRIC_TAG);
}
private AtlasEntityHeader createProcessEntity(String processName) throws Exception {
return createProcess(processName, "hive query for monthly avg salary", "user ETL",
asList(tableEntityUS),
asList(tableEntityCanada),
"create table as select ", "plan", "id", "graph", SampleAppConstants.CLASSIFIED_TAG);
}
private AtlasEntityHeader createProcess(String name, String description, String user, List<AtlasEntity> inputs, List<AtlasEntity> outputs,
String queryText, String queryPlan, String queryId, String queryGraph, String... classificationNames) throws Exception {
AtlasEntity entity = new AtlasEntity(SampleAppConstants.PROCESS_TYPE);
entity.setAttribute(ATTR_NAME, name);
entity.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, name + METADATA_NAMESPACE_SUFFIX);
entity.setAttribute(ATTR_DESCRIPTION, description);
entity.setAttribute(ATTR_USERNAME, user);
entity.setAttribute(ATTR_START_TIME, System.currentTimeMillis());
entity.setAttribute(ATTR_END_TIME, System.currentTimeMillis() + 10000);
entity.setAttribute(ATTR_QUERY_TEXT, queryText);
entity.setAttribute(ATTR_QUERY_PLAN, queryPlan);
entity.setAttribute(ATTR_QUERY_ID, queryId);
entity.setAttribute(ATTR_QUERY_GRAPH, queryGraph);
entity.setAttribute(ATTR_OPERATION_TYPE, "testOperation");
entity.setRelationshipAttribute(ATTR_INPUTS, toAtlasRelatedObjectIds(inputs));
entity.setRelationshipAttribute(ATTR_OUTPUTS, toAtlasRelatedObjectIds(outputs));
entity.setClassifications(toAtlasClassifications(classificationNames));
return createEntity(new AtlasEntityWithExtInfo(entity));
}
private AtlasEntity createColumn(String name, String dataType, String comment, String... classificationNames) {
AtlasEntity ret = new AtlasEntity(SampleAppConstants.COLUMN_TYPE);
ret.setAttribute(ATTR_NAME, name);
ret.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, name + METADATA_NAMESPACE_SUFFIX);
ret.setAttribute(ATTR_DATA_TYPE, dataType);
ret.setAttribute(ATTR_COMMENT, comment);
ret.setClassifications(toAtlasClassifications(classificationNames));
return ret;
}
private List<AtlasClassification> toAtlasClassifications(String[] classificationNames) {
List<AtlasClassification> ret = new ArrayList<>();
if (classificationNames != null) {
for (String classificationName : classificationNames) {
ret.add(new AtlasClassification(classificationName));
}
}
return ret;
}
private AtlasEntityHeader createEntity(AtlasEntityWithExtInfo atlasEntityWithExtInfo) {
EntityMutationResponse entity;
try {
entity = client.createEntity(atlasEntityWithExtInfo);
if (entity != null && entity.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE) != null) {
List<AtlasEntityHeader> list = entity.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE);
if (list.size() > 0) {
return entity.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE).get(0);
}
}
} catch (AtlasServiceException e) {
SampleApp.log("failed in create entity");
e.printStackTrace();
}
return null;
}
private AtlasEntity createDatabaseEntity(String dbName) {
AtlasEntity hiveDBInstance = createHiveDBInstance(dbName);
AtlasEntityHeader entityHeader = createEntity(new AtlasEntityWithExtInfo(hiveDBInstance));
if (entityHeader != null && entityHeader.getGuid() != null) {
hiveDBInstance.setGuid(entityHeader.getGuid());
}
return hiveDBInstance;
}
protected AtlasEntity createHiveDBInstance(String dbName) {
AtlasEntity entity = new AtlasEntity(SampleAppConstants.DATABASE_TYPE);
entity.setAttribute(ATTR_NAME, dbName);
entity.setAttribute(ATTR_DESCRIPTION, "employee database");
entity.setAttribute(METADATA_NAMESPACE_SUFFIX, "employeeCluster");
entity.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, dbName + METADATA_NAMESPACE_SUFFIX);
entity.setAttribute(ATTR_OWNER, "user");
entity.setAttribute(ATTR_LOCATION_URI, "/tmp");
entity.setAttribute(ATTR_CREATE_TIME, 1000);
return entity;
}
private AtlasEntity createHiveTable(AtlasEntity database, String tableName, String tableType, List<AtlasEntity> columns, String... classificationNames) throws Exception {
AtlasEntityWithExtInfo entityWithExtInfo = new AtlasEntityWithExtInfo();
AtlasEntity hiveTableInstance = createHiveTable(database, tableName, tableType, classificationNames);
entityWithExtInfo.setEntity(hiveTableInstance);
hiveTableInstance.setRelationshipAttribute(ATTR_COLUMNS, toAtlasRelatedObjectIds(columns));
for (AtlasEntity column : columns) {
column.setRelationshipAttribute(ATTR_TABLE, toAtlasRelatedObjectId(hiveTableInstance));
entityWithExtInfo.addReferredEntity(column);
}
AtlasEntityHeader createdHeader = createEntity(entityWithExtInfo);
if (createdHeader != null && createdHeader.getGuid() != null) {
hiveTableInstance.setGuid(createdHeader.getGuid());
}
return hiveTableInstance;
}
private AtlasEntity createHiveTable(AtlasEntity database, String tableName, String tableType, String... classificationNames) throws Exception {
AtlasEntity table = new AtlasEntity(SampleAppConstants.TABLE_TYPE);
table.setAttribute(ATTR_NAME, tableName);
table.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, database.getAttribute(ATTR_NAME) + "." + tableName + METADATA_NAMESPACE_SUFFIX);
table.setAttribute(ATTR_TABLE_TYPE, tableType);
table.setRelationshipAttribute(ATTR_DB, AtlasTypeUtil.getAtlasRelatedObjectId(database, TABLE_DATABASE_TYPE));
table.setAttribute(ATTR_DESCRIPTION, "emp table");
table.setAttribute(ATTR_LAST_ACCESS_TIME, "2014-07-11T08:00:00.000Z");
table.setAttribute(ATTR_LEVEL, 2);
table.setAttribute(ATTR_COMPRESSED, false);
table.setClassifications(toAtlasClassifications(classificationNames));
AtlasStruct serde1 = new AtlasStruct(STRUCT_TYPE_SERDE);
serde1.setAttribute(ATTR_NAME, "serde1");
serde1.setAttribute(ATTR_SERDE, "serde1");
table.setAttribute(ATTR_SERDE1, serde1);
AtlasStruct serde2 = new AtlasStruct(STRUCT_TYPE_SERDE);
serde2.setAttribute(ATTR_NAME, "serde2");
serde2.setAttribute(ATTR_SERDE, "serde2");
table.setAttribute(ATTR_SERDE2, serde2);
return table;
}
}