blob: ad2a697aa979210acc260a11c18cfd9b3aa2e4ab [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.util;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.typesystem.types.TypeSystem;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_PREFIX;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_SUFFIX;
public final class LineageUtils {
private LineageUtils() {}
private static final String VERTICES_ATTR_NAME = "vertices";
private static final String EDGES_ATTR_NAME = "edges";
private static final String VERTEX_ID_ATTR_NAME = "vertexId";
private static final String TEMP_STRUCT_ID_RESULT = "__IdType";
private static final AtomicInteger COUNTER = new AtomicInteger();
public static String toLineageStruct(AtlasLineageInfo lineageInfo, AtlasTypeRegistry registry) throws AtlasBaseException {
String ret = null;
if (lineageInfo != null) {
Map<String, AtlasEntityHeader> entities = lineageInfo.getGuidEntityMap();
Set<AtlasLineageInfo.LineageRelation> relations = lineageInfo.getRelations();
AtlasLineageInfo.LineageDirection direction = lineageInfo.getLineageDirection();
Map<String, Struct> verticesMap = new HashMap<>();
// Lineage Entities mapping -> verticesMap (vertices)
for (String guid : entities.keySet()) {
AtlasEntityHeader entityHeader = entities.get(guid);
if (isDataSet(entityHeader.getTypeName(), registry)) {
Map<String, Object> vertexIdMap = new HashMap<>();
TypeSystem.IdType idType = TypeSystem.getInstance().getIdType();
vertexIdMap.put(idType.idAttrName(), guid);
vertexIdMap.put(idType.stateAttrName(), (entityHeader.getStatus() == AtlasEntity.Status.ACTIVE) ? "ACTIVE" : "DELETED");
vertexIdMap.put(idType.typeNameAttrName(), entityHeader.getTypeName());
Object qualifiedName = entityHeader.getAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
if (qualifiedName == null) {
qualifiedName = entityHeader.getDisplayText();
}
Map<String, Object> values = new HashMap<>();
values.put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, qualifiedName);
values.put(VERTEX_ID_ATTR_NAME, constructResultStruct(vertexIdMap, true));
values.put(AtlasClient.NAME, entityHeader.getDisplayText());
verticesMap.put(guid, constructResultStruct(values, false));
}
}
// Lineage Relations mapping -> edgesMap (edges)
Map<String, List<String>> edgesMap = new HashMap<>();
for (AtlasLineageInfo.LineageRelation relation : relations) {
String fromEntityId = relation.getFromEntityId();
String toEntityId = relation.getToEntityId();
if (direction == AtlasLineageInfo.LineageDirection.INPUT) {
if (!edgesMap.containsKey(toEntityId)) {
edgesMap.put(toEntityId, new ArrayList<String>());
}
edgesMap.get(toEntityId).add(fromEntityId);
} else if (direction == AtlasLineageInfo.LineageDirection.OUTPUT) {
if (!edgesMap.containsKey(fromEntityId)) {
edgesMap.put(fromEntityId, new ArrayList<String>());
}
edgesMap.get(fromEntityId).add(toEntityId);
}
}
Map<String, Object> map = new HashMap<>();
map.put(VERTICES_ATTR_NAME, verticesMap);
map.put(EDGES_ATTR_NAME, edgesMap);
ret = InstanceSerialization.toJson(constructResultStruct(map, false), false);
}
return ret;
}
private static Struct constructResultStruct(Map<String, Object> values, boolean idType) {
if (idType) {
return new Struct(TEMP_STRUCT_ID_RESULT, values);
}
return new Struct(org.apache.atlas.query.TypeUtils.TEMP_STRUCT_NAME_PREFIX() + COUNTER.getAndIncrement(), values);
}
private static boolean isDataSet(String typeName, AtlasTypeRegistry registry) throws AtlasBaseException {
boolean ret = false;
AtlasType type = registry.getType(typeName);
if (type instanceof AtlasEntityType) {
AtlasEntityType entityType = (AtlasEntityType) type;
ret = entityType.getAllSuperTypes().contains(AtlasBaseTypeDef.ATLAS_TYPE_DATASET);
}
return ret;
}
}