blob: 512fff470d04bc09f4b3d52aaefcecc7816c25f1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.authorize.AtlasPrivilege;
import org.apache.atlas.authorize.AtlasRelationshipAccessRequest;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.instance.AtlasRelationship.AtlasRelationshipWithExtInfo;
import org.apache.atlas.model.typedef.AtlasRelationshipDef;
import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags;
import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasRelationshipType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.TWO_TO_ONE;
import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY;
import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.VERSION_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationName;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertices;
import static org.apache.atlas.repository.graph.GraphHelper.getOutGoingEdgesByLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getState;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getTypeName;
@Component
public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
private static final Logger LOG = LoggerFactory.getLogger(AtlasRelationshipStoreV2.class);
private static final Long DEFAULT_RELATIONSHIP_VERSION = 0L;
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphRetriever entityRetriever;
private final DeleteHandlerV1 deleteHandler;
private final GraphHelper graphHelper = GraphHelper.getInstance();
private final AtlasEntityChangeNotifier entityChangeNotifier;
@Inject
public AtlasRelationshipStoreV2(AtlasTypeRegistry typeRegistry, DeleteHandlerV1 deleteHandler, AtlasEntityChangeNotifier entityChangeNotifier) {
this.typeRegistry = typeRegistry;
this.entityRetriever = new EntityGraphRetriever(typeRegistry);
this.deleteHandler = deleteHandler;
this.entityChangeNotifier = entityChangeNotifier;
}
@Override
@GraphTransaction
public AtlasRelationship create(AtlasRelationship relationship) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> create({})", relationship);
}
AtlasVertex end1Vertex = getVertexFromEndPoint(relationship.getEnd1());
AtlasVertex end2Vertex = getVertexFromEndPoint(relationship.getEnd2());
validateRelationship(end1Vertex, end2Vertex, relationship.getTypeName(), relationship.getAttributes());
AtlasEdge edge = createRelationship(end1Vertex, end2Vertex, relationship);
AtlasRelationship ret = edge != null ? entityRetriever.mapEdgeToAtlasRelationship(edge) : null;
if (LOG.isDebugEnabled()) {
LOG.debug("<== create({}): {}", relationship, ret);
}
// notify entities for added/removed classification propagation
entityChangeNotifier.notifyPropagatedEntities();
return ret;
}
@Override
@GraphTransaction
public AtlasRelationship update(AtlasRelationship relationship) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> update({})", relationship);
}
String guid = relationship.getGuid();
if (StringUtils.isEmpty(guid)) {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_GUID_NOT_FOUND, guid);
}
AtlasEdge edge = graphHelper.getEdgeForGUID(guid);
String edgeType = AtlasGraphUtilsV2.getTypeName(edge);
AtlasVertex end1Vertex = edge.getOutVertex();
AtlasVertex end2Vertex = edge.getInVertex();
// update shouldn't change endType
if (StringUtils.isNotEmpty(relationship.getTypeName()) && !StringUtils.equalsIgnoreCase(edgeType, relationship.getTypeName())) {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_UPDATE_TYPE_CHANGE_NOT_ALLOWED, guid, edgeType, relationship.getTypeName());
}
// update shouldn't change ends
if (relationship.getEnd1() != null) {
String updatedEnd1Guid = relationship.getEnd1().getGuid();
if (updatedEnd1Guid == null) {
AtlasVertex updatedEnd1Vertex = getVertexFromEndPoint(relationship.getEnd1());
updatedEnd1Guid = updatedEnd1Vertex == null ? null : AtlasGraphUtilsV2.getIdFromVertex(updatedEnd1Vertex);
}
if (updatedEnd1Guid != null) {
String end1Guid = AtlasGraphUtilsV2.getIdFromVertex(end1Vertex);
if (!StringUtils.equalsIgnoreCase(relationship.getEnd1().getGuid(), end1Guid)) {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_UPDATE_END_CHANGE_NOT_ALLOWED, edgeType, guid, end1Guid, relationship.getEnd1().getGuid());
}
}
}
// update shouldn't change ends
if (relationship.getEnd2() != null) {
String updatedEnd2Guid = relationship.getEnd2().getGuid();
if (updatedEnd2Guid == null) {
AtlasVertex updatedEnd2Vertex = getVertexFromEndPoint(relationship.getEnd2());
updatedEnd2Guid = updatedEnd2Vertex == null ? null : AtlasGraphUtilsV2.getIdFromVertex(updatedEnd2Vertex);
}
if (updatedEnd2Guid != null) {
String end2Guid = AtlasGraphUtilsV2.getIdFromVertex(end2Vertex);
if (!StringUtils.equalsIgnoreCase(relationship.getEnd2().getGuid(), end2Guid)) {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_UPDATE_END_CHANGE_NOT_ALLOWED, AtlasGraphUtilsV2.getTypeName(edge), guid, end2Guid, relationship.getEnd2().getGuid());
}
}
}
validateRelationship(end1Vertex, end2Vertex, edgeType, relationship.getAttributes());
AtlasRelationship ret = updateRelationship(edge, relationship);
// notify entities for added/removed classification propagation
entityChangeNotifier.notifyPropagatedEntities();
if (LOG.isDebugEnabled()) {
LOG.debug("<== update({}): {}", relationship, ret);
}
return ret;
}
@Override
@GraphTransaction
public AtlasRelationship getById(String guid) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> getById({})", guid);
}
AtlasEdge edge = graphHelper.getEdgeForGUID(guid);
AtlasRelationship ret = entityRetriever.mapEdgeToAtlasRelationship(edge);
if (LOG.isDebugEnabled()) {
LOG.debug("<== getById({}): {}", guid, ret);
}
return ret;
}
@Override
@GraphTransaction
public AtlasRelationshipWithExtInfo getExtInfoById(String guid) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> getExtInfoById({})", guid);
}
AtlasEdge edge = graphHelper.getEdgeForGUID(guid);
AtlasRelationshipWithExtInfo ret = entityRetriever.mapEdgeToAtlasRelationshipWithExtInfo(edge);
if (LOG.isDebugEnabled()) {
LOG.debug("<== getExtInfoById({}): {}", guid, ret);
}
return ret;
}
@Override
@GraphTransaction
public void deleteById(String guid) throws AtlasBaseException {
deleteById(guid, false);
}
@Override
@GraphTransaction
public void deleteById(String guid, boolean forceDelete) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> deleteById({}, {})", guid, forceDelete);
}
if (StringUtils.isEmpty(guid)) {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_CRUD_INVALID_PARAMS, " empty/null guid");
}
AtlasEdge edge = graphHelper.getEdgeForGUID(guid);
if (edge == null) {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_GUID_NOT_FOUND, guid);
}
if (getState(edge) == DELETED) {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_ALREADY_DELETED, guid);
}
String relationShipType = GraphHelper.getTypeName(edge);
AtlasEntityHeader end1Entity = entityRetriever.toAtlasEntityHeaderWithClassifications(edge.getOutVertex());
AtlasEntityHeader end2Entity = entityRetriever.toAtlasEntityHeaderWithClassifications(edge.getInVertex());
AtlasAuthorizationUtils.verifyAccess(new AtlasRelationshipAccessRequest(typeRegistry,AtlasPrivilege.RELATIONSHIP_REMOVE, relationShipType, end1Entity, end2Entity ));
deleteHandler.deleteRelationships(Collections.singleton(edge), forceDelete);
// notify entities for added/removed classification propagation
entityChangeNotifier.notifyPropagatedEntities();
if (LOG.isDebugEnabled()) {
LOG.debug("<== deleteById({}): {}", guid);
}
}
@Override
public AtlasEdge getOrCreate(AtlasVertex end1Vertex, AtlasVertex end2Vertex, AtlasRelationship relationship) throws AtlasBaseException {
AtlasEdge ret = getRelationshipEdge(end1Vertex, end2Vertex, relationship.getTypeName());
if (ret == null) {
validateRelationship(end1Vertex, end2Vertex, relationship.getTypeName(), relationship.getAttributes());
ret = createRelationship(end1Vertex, end2Vertex, relationship);
}
return ret;
}
@Override
public AtlasRelationship getOrCreate(AtlasRelationship relationship) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> getOrCreate({})", relationship);
}
validateRelationship(relationship);
AtlasVertex end1Vertex = getVertexFromEndPoint(relationship.getEnd1());
AtlasVertex end2Vertex = getVertexFromEndPoint(relationship.getEnd2());
AtlasRelationship ret = null;
// check if relationship exists
AtlasEdge relationshipEdge = getRelationshipEdge(end1Vertex, end2Vertex, relationship.getTypeName());
if (relationshipEdge == null) {
validateRelationship(relationship);
relationshipEdge = createRelationship(end1Vertex, end2Vertex, relationship);
}
if (relationshipEdge != null){
ret = entityRetriever.mapEdgeToAtlasRelationship(relationshipEdge);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== getOrCreate({}): {}", relationship, ret);
}
return ret;
}
private AtlasEdge createRelationship(AtlasVertex end1Vertex, AtlasVertex end2Vertex, AtlasRelationship relationship) throws AtlasBaseException {
AtlasEdge ret = null;
try {
ret = getRelationshipEdge(end1Vertex, end2Vertex, relationship.getTypeName());
if (ret != null) {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_ALREADY_EXISTS, relationship.getTypeName(),
AtlasGraphUtilsV2.getIdFromVertex(end1Vertex), AtlasGraphUtilsV2.getIdFromVertex(end2Vertex));
}
AtlasRelationshipType relationType = typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
if (!relationType.hasLegacyAttributeEnd()) { // skip authorization for legacy attributes, as these would be covered as entity-update
AtlasEntityHeader end1Entity = entityRetriever.toAtlasEntityHeaderWithClassifications(end1Vertex);
AtlasEntityHeader end2Entity = entityRetriever.toAtlasEntityHeaderWithClassifications(end2Vertex);
AtlasAuthorizationUtils.verifyAccess(new AtlasRelationshipAccessRequest(typeRegistry, AtlasPrivilege.RELATIONSHIP_ADD, relationship.getTypeName(), end1Entity, end2Entity));
}
ret = createRelationshipEdge(end1Vertex, end2Vertex, relationship);
if (MapUtils.isNotEmpty(relationType.getAllAttributes())) {
for (AtlasAttribute attr : relationType.getAllAttributes().values()) {
String attrName = attr.getName();
String attrVertexProperty = attr.getVertexPropertyName();
Object attrValue = relationship.getAttribute(attrName);
AtlasGraphUtilsV2.setEncodedProperty(ret, attrVertexProperty, attrValue);
}
}
} catch (RepositoryException e) {
throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
}
return ret;
}
private AtlasRelationship updateRelationship(AtlasEdge relationshipEdge, AtlasRelationship relationship) throws AtlasBaseException {
AtlasRelationshipType relationType = typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
AtlasVertex end1Vertex = relationshipEdge.getOutVertex();
AtlasVertex end2Vertex = relationshipEdge.getInVertex();
AtlasEntityHeader end1Entity = entityRetriever.toAtlasEntityHeaderWithClassifications(end1Vertex);
AtlasEntityHeader end2Entity = entityRetriever.toAtlasEntityHeaderWithClassifications(end2Vertex);
AtlasAuthorizationUtils.verifyAccess(new AtlasRelationshipAccessRequest(typeRegistry, AtlasPrivilege.RELATIONSHIP_UPDATE, relationship.getTypeName(), end1Entity, end2Entity));
updateTagPropagations(relationshipEdge, relationship);
if (MapUtils.isNotEmpty(relationType.getAllAttributes())) {
for (AtlasAttribute attr : relationType.getAllAttributes().values()) {
String attrName = attr.getName();
String attrVertexProperty = attr.getVertexPropertyName();
if (relationship.hasAttribute(attrName)) {
AtlasGraphUtilsV2.setEncodedProperty(relationshipEdge, attrVertexProperty, relationship.getAttribute(attrName));
}
}
}
return entityRetriever.mapEdgeToAtlasRelationship(relationshipEdge);
}
private void handleBlockedClassifications(AtlasEdge edge, Set<AtlasClassification> blockedPropagatedClassifications) throws AtlasBaseException {
if (blockedPropagatedClassifications != null) {
List<AtlasVertex> propagatedClassificationVertices = getClassificationVertices(edge);
List<String> currentClassificationIds = getBlockedClassificationIds(edge);
List<AtlasVertex> currentBlockedPropagatedClassificationVertices = getBlockedClassificationVertices(propagatedClassificationVertices, currentClassificationIds);
List<AtlasVertex> updatedBlockedPropagatedClassificationVertices = new ArrayList<>();
List<String> updatedClassificationIds = new ArrayList<>();
for (AtlasClassification classification : blockedPropagatedClassifications) {
AtlasVertex classificationVertex = validateBlockedPropagatedClassification(propagatedClassificationVertices, classification);
// ignore invalid blocked propagated classification
if (classificationVertex == null) {
continue;
}
updatedBlockedPropagatedClassificationVertices.add(classificationVertex);
String classificationId = classificationVertex.getIdForDisplay();
updatedClassificationIds.add(classificationId);
}
addToBlockedClassificationIds(edge, updatedClassificationIds);
// remove propagated tag for added entry
List<AtlasVertex> addedBlockedClassifications = (List<AtlasVertex>) CollectionUtils.subtract(updatedBlockedPropagatedClassificationVertices, currentBlockedPropagatedClassificationVertices);
for (AtlasVertex classificationVertex : addedBlockedClassifications) {
List<AtlasVertex> removePropagationFromVertices = graphHelper.getPropagatedEntityVertices(classificationVertex);
deleteHandler.removeTagPropagation(classificationVertex, removePropagationFromVertices);
}
// add propagated tag for removed entry
List<AtlasVertex> removedBlockedClassifications = (List<AtlasVertex>) CollectionUtils.subtract(currentBlockedPropagatedClassificationVertices, updatedBlockedPropagatedClassificationVertices);
for (AtlasVertex classificationVertex : removedBlockedClassifications) {
List<AtlasVertex> addPropagationToVertices = graphHelper.getPropagatedEntityVertices(classificationVertex);
deleteHandler.addTagPropagation(classificationVertex, addPropagationToVertices);
}
}
}
private List<AtlasVertex> getBlockedClassificationVertices(List<AtlasVertex> classificationVertices, List<String> blockedClassificationIds) {
List<AtlasVertex> ret = new ArrayList<>();
if (CollectionUtils.isNotEmpty(blockedClassificationIds)) {
for (AtlasVertex classificationVertex : classificationVertices) {
String classificationId = classificationVertex.getIdForDisplay();
if (blockedClassificationIds.contains(classificationId)) {
ret.add(classificationVertex);
}
}
}
return ret;
}
// propagated classifications should contain blocked propagated classification
private AtlasVertex validateBlockedPropagatedClassification(List<AtlasVertex> classificationVertices, AtlasClassification classification) {
AtlasVertex ret = null;
for (AtlasVertex vertex : classificationVertices) {
String classificationName = getClassificationName(vertex);
String entityGuid = getClassificationEntityGuid(vertex);
if (classificationName.equals(classification.getTypeName()) && entityGuid.equals(classification.getEntityGuid())) {
ret = vertex;
break;
}
}
return ret;
}
private void addToBlockedClassificationIds(AtlasEdge edge, List<String> classificationIds) {
if (edge != null) {
if (classificationIds.isEmpty()) {
edge.removeProperty(Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY);
} else {
edge.setListProperty(Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY, classificationIds);
}
}
}
private void updateTagPropagations(AtlasEdge edge, AtlasRelationship relationship) throws AtlasBaseException {
PropagateTags oldTagPropagation = getPropagateTags(edge);
PropagateTags newTagPropagation = relationship.getPropagateTags();
if (newTagPropagation != oldTagPropagation) {
List<AtlasVertex> currentClassificationVertices = getClassificationVertices(edge);
Map<AtlasVertex, List<AtlasVertex>> currentClassificationsMap = graphHelper.getClassificationPropagatedEntitiesMapping(currentClassificationVertices);
// Update propagation edge
AtlasGraphUtilsV2.setEncodedProperty(edge, RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, newTagPropagation.name());
List<AtlasVertex> updatedClassificationVertices = getClassificationVertices(edge);
List<AtlasVertex> classificationVerticesUnion = (List<AtlasVertex>) CollectionUtils.union(currentClassificationVertices, updatedClassificationVertices);
Map<AtlasVertex, List<AtlasVertex>> updatedClassificationsMap = graphHelper.getClassificationPropagatedEntitiesMapping(classificationVerticesUnion);
// compute add/remove propagations list
Map<AtlasVertex, List<AtlasVertex>> addPropagationsMap = new HashMap<>();
Map<AtlasVertex, List<AtlasVertex>> removePropagationsMap = new HashMap<>();
if (MapUtils.isEmpty(currentClassificationsMap) && MapUtils.isNotEmpty(updatedClassificationsMap)) {
addPropagationsMap.putAll(updatedClassificationsMap);
} else if (MapUtils.isNotEmpty(currentClassificationsMap) && MapUtils.isEmpty(updatedClassificationsMap)) {
removePropagationsMap.putAll(currentClassificationsMap);
} else {
for (AtlasVertex classificationVertex : updatedClassificationsMap.keySet()) {
List<AtlasVertex> currentPropagatingEntities = currentClassificationsMap.containsKey(classificationVertex) ? currentClassificationsMap.get(classificationVertex) : Collections.emptyList();
List<AtlasVertex> updatedPropagatingEntities = updatedClassificationsMap.containsKey(classificationVertex) ? updatedClassificationsMap.get(classificationVertex) : Collections.emptyList();
List<AtlasVertex> entitiesAdded = (List<AtlasVertex>) CollectionUtils.subtract(updatedPropagatingEntities, currentPropagatingEntities);
List<AtlasVertex> entitiesRemoved = (List<AtlasVertex>) CollectionUtils.subtract(currentPropagatingEntities, updatedPropagatingEntities);
if (CollectionUtils.isNotEmpty(entitiesAdded)) {
addPropagationsMap.put(classificationVertex, entitiesAdded);
}
if (CollectionUtils.isNotEmpty(entitiesRemoved)) {
removePropagationsMap.put(classificationVertex, entitiesRemoved);
}
}
}
for (AtlasVertex classificationVertex : addPropagationsMap.keySet()) {
deleteHandler.addTagPropagation(classificationVertex, addPropagationsMap.get(classificationVertex));
}
for (AtlasVertex classificationVertex : removePropagationsMap.keySet()) {
deleteHandler.removeTagPropagation(classificationVertex, removePropagationsMap.get(classificationVertex));
}
} else {
// update blocked propagated classifications only if there is no change is tag propagation (don't update both)
handleBlockedClassifications(edge, relationship.getBlockedPropagatedClassifications());
}
}
private void validateRelationship(AtlasRelationship relationship) throws AtlasBaseException {
if (relationship == null) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "AtlasRelationship is null");
}
String relationshipName = relationship.getTypeName();
String end1TypeName = getTypeNameFromObjectId(relationship.getEnd1());
String end2TypeName = getTypeNameFromObjectId(relationship.getEnd2());
AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationshipName);
if (relationshipType == null) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "unknown relationship type'" + relationshipName + "'");
}
if (relationship.getEnd1() == null || relationship.getEnd2() == null) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "end1/end2 is null");
}
boolean validEndTypes = false;
if (relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end1TypeName)) {
validEndTypes = relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end2TypeName);
} else if (relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end1TypeName)) {
validEndTypes = relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end2TypeName);
}
if (!validEndTypes) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_END_TYPE, relationshipName, relationshipType.getEnd2Type().getTypeName(), end1TypeName);
}
validateEnds(relationship);
validateAndNormalize(relationship);
}
private void validateRelationship(AtlasVertex end1Vertex, AtlasVertex end2Vertex, String relationshipName, Map<String, Object> attributes) throws AtlasBaseException {
AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationshipName);
if (relationshipType == null) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "unknown relationship type'" + relationshipName + "'");
}
if (end1Vertex == null) {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_END_IS_NULL, relationshipType.getEnd1Type().getTypeName());
}
if (end2Vertex == null) {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_END_IS_NULL, relationshipType.getEnd2Type().getTypeName());
}
String end1TypeName = AtlasGraphUtilsV2.getTypeName(end1Vertex);
String end2TypeName = AtlasGraphUtilsV2.getTypeName(end2Vertex);
boolean validEndTypes = false;
if (relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end1TypeName)) {
validEndTypes = relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end2TypeName);
} else if (relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end1TypeName)) {
validEndTypes = relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end2TypeName);
}
if (!validEndTypes) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_END_TYPE, relationshipName, relationshipType.getEnd2Type().getTypeName(), end1TypeName);
}
List<String> messages = new ArrayList<>();
AtlasRelationship relationship = new AtlasRelationship(relationshipName, attributes);
relationshipType.validateValue(relationship, relationshipName, messages);
if (!messages.isEmpty()) {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_CRUD_INVALID_PARAMS, messages);
}
relationshipType.getNormalizedValue(relationship);
}
/**
* Validate the ends of the passed relationship
* @param relationship
* @throws AtlasBaseException
*/
private void validateEnds(AtlasRelationship relationship) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("validateEnds entry relationship:" + relationship);
}
List<AtlasObjectId> ends = new ArrayList<>();
List<AtlasRelationshipEndDef> endDefs = new ArrayList<>();
String relationshipTypeName = relationship.getTypeName();
AtlasRelationshipDef relationshipDef = typeRegistry.getRelationshipDefByName(relationshipTypeName);
ends.add(relationship.getEnd1());
ends.add(relationship.getEnd2());
endDefs.add(relationshipDef.getEndDef1());
endDefs.add(relationshipDef.getEndDef2());
for (int i = 0; i < ends.size(); i++) {
AtlasObjectId end = ends.get(i);
String guid = end.getGuid();
String typeName = end.getTypeName();
Map<String, Object> uniqueAttributes = end.getUniqueAttributes();
AtlasVertex endVertex = AtlasGraphUtilsV2.findByGuid(guid);
if (!AtlasTypeUtil.isValidGuid(guid) || endVertex == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
} else if (MapUtils.isNotEmpty(uniqueAttributes)) {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
if (AtlasGraphUtilsV2.findByUniqueAttributes(entityType, uniqueAttributes) == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, typeName, uniqueAttributes.toString());
}
} else {
// check whether the guid is the correct type
String vertexTypeName = endVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class);
if (!Objects.equals(vertexTypeName, typeName)) {
String attrName = endDefs.get(i).getName();
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_INVALID_ENDTYPE, attrName, guid, vertexTypeName, typeName);
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("validateEnds exit successfully validated relationship:" + relationship);
}
}
private void validateAndNormalize(AtlasRelationship relationship) throws AtlasBaseException {
List<String> messages = new ArrayList<>();
if (! AtlasTypeUtil.isValidGuid(relationship.getGuid())) {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_GUID_NOT_FOUND, relationship.getGuid());
}
AtlasRelationshipType type = typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
if (type == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.RELATIONSHIP.name(), relationship.getTypeName());
}
type.validateValue(relationship, relationship.getTypeName(), messages);
if (!messages.isEmpty()) {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_CRUD_INVALID_PARAMS, messages);
}
type.getNormalizedValue(relationship);
}
public AtlasEdge getRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, String relationshipType) {
String relationshipLabel = getRelationshipEdgeLabel(fromVertex, toVertex, relationshipType);
Iterator<AtlasEdge> edgesIterator = getOutGoingEdgesByLabel(fromVertex, relationshipLabel);
AtlasEdge ret = null;
while (edgesIterator != null && edgesIterator.hasNext()) {
AtlasEdge edge = edgesIterator.next();
if (edge != null) {
Status status = graphHelper.getStatus(edge);
if ((status == null || status == ACTIVE) &&
StringUtils.equals(getIdFromVertex(edge.getInVertex()), getIdFromVertex(toVertex))) {
ret = edge;
break;
}
}
}
return ret;
}
private Long getRelationshipVersion(AtlasRelationship relationship) {
Long ret = relationship != null ? relationship.getVersion() : null;
return (ret != null) ? ret : DEFAULT_RELATIONSHIP_VERSION;
}
private AtlasVertex getVertexFromEndPoint(AtlasObjectId endPoint) {
AtlasVertex ret = null;
if (StringUtils.isNotEmpty(endPoint.getGuid())) {
ret = AtlasGraphUtilsV2.findByGuid(endPoint.getGuid());
} else if (StringUtils.isNotEmpty(endPoint.getTypeName()) && MapUtils.isNotEmpty(endPoint.getUniqueAttributes())) {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(endPoint.getTypeName());
ret = AtlasGraphUtilsV2.findByUniqueAttributes(entityType, endPoint.getUniqueAttributes());
}
return ret;
}
private AtlasEdge createRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasRelationship relationship) throws RepositoryException, AtlasBaseException {
String relationshipLabel = getRelationshipEdgeLabel(fromVertex, toVertex, relationship.getTypeName());
PropagateTags tagPropagation = getRelationshipTagPropagation(fromVertex, toVertex, relationship);
AtlasEdge ret = graphHelper.getOrCreateEdge(fromVertex, toVertex, relationshipLabel);
if (LOG.isDebugEnabled()) {
LOG.debug("Created relationship edge from [{}] --> [{}] using edge label: [{}]", getTypeName(fromVertex), getTypeName(toVertex), relationshipLabel);
}
// map additional properties to relationship edge
if (ret != null) {
// Accept a valid (assigned) guid from the supplied relationship, or generate one.
String relationshipGuid = relationship.getGuid();
final String guid = AtlasTypeUtil.isAssignedGuid(relationshipGuid) ? relationshipGuid : UUID.randomUUID().toString();
AtlasGraphUtilsV2.setEncodedProperty(ret, ENTITY_TYPE_PROPERTY_KEY, relationship.getTypeName());
AtlasGraphUtilsV2.setEncodedProperty(ret, RELATIONSHIP_GUID_PROPERTY_KEY, guid);
AtlasGraphUtilsV2.setEncodedProperty(ret, VERSION_PROPERTY_KEY, getRelationshipVersion(relationship));
AtlasGraphUtilsV2.setEncodedProperty(ret, RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, tagPropagation.name());
// blocked propagated classifications
handleBlockedClassifications(ret, relationship.getBlockedPropagatedClassifications());
// propagate tags
deleteHandler.addTagPropagation(ret, tagPropagation);
}
return ret;
}
private PropagateTags getRelationshipTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasRelationship relationship) {
AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
AtlasRelationshipEndDef endDef1 = relationshipType.getRelationshipDef().getEndDef1();
AtlasRelationshipEndDef endDef2 = relationshipType.getRelationshipDef().getEndDef2();
Set<String> fromVertexTypes = getTypeAndAllSuperTypes(getTypeName(fromVertex));
Set<String> toVertexTypes = getTypeAndAllSuperTypes(getTypeName(toVertex));
PropagateTags ret = relationshipType.getRelationshipDef().getPropagateTags();
// relationshipDef is defined as end1 (hive_db) and end2 (hive_table) and tagPropagation = ONE_TO_TWO
// relationship edge exists from [hive_table --> hive_db]
// swap the tagPropagation property for such cases.
if (fromVertexTypes.contains(endDef2.getType()) && toVertexTypes.contains(endDef1.getType())) {
if (ret == ONE_TO_TWO) {
ret = TWO_TO_ONE;
} else if (ret == TWO_TO_ONE) {
ret = ONE_TO_TWO;
}
}
return ret;
}
private String getRelationshipEdgeLabel(AtlasVertex fromVertex, AtlasVertex toVertex, String relationshipTypeName) {
if (LOG.isDebugEnabled()) {
LOG.debug("getRelationshipEdgeLabel({})", relationshipTypeName);
}
AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationshipTypeName);
String ret = relationshipType.getRelationshipDef().getRelationshipLabel();
AtlasRelationshipEndDef endDef1 = relationshipType.getRelationshipDef().getEndDef1();
AtlasRelationshipEndDef endDef2 = relationshipType.getRelationshipDef().getEndDef2();
Set<String> fromVertexTypes = getTypeAndAllSuperTypes(AtlasGraphUtilsV2.getTypeName(fromVertex));
Set<String> toVertexTypes = getTypeAndAllSuperTypes(AtlasGraphUtilsV2.getTypeName(toVertex));
AtlasAttribute attribute = null;
// validate entity type and all its supertypes contains relationshipDefs end type
// e.g. [hive_process -> hive_table] -> [Process -> DataSet]
if (fromVertexTypes.contains(endDef1.getType()) && toVertexTypes.contains(endDef2.getType())) {
String attributeName = endDef1.getName();
attribute = relationshipType.getEnd1Type().getRelationshipAttribute(attributeName);
} else if (fromVertexTypes.contains(endDef2.getType()) && toVertexTypes.contains(endDef1.getType())) {
String attributeName = endDef2.getName();
attribute = relationshipType.getEnd2Type().getRelationshipAttribute(attributeName);
}
if (attribute != null) {
ret = attribute.getRelationshipEdgeLabel();
}
return ret;
}
public Set<String> getTypeAndAllSuperTypes(String entityTypeName) {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
return (entityType != null) ? entityType.getTypeAndAllSuperTypes() : new HashSet<String>();
}
private String getTypeNameFromObjectId(AtlasObjectId objectId) {
String typeName = objectId.getTypeName();
if (StringUtils.isBlank(typeName)) {
typeName = AtlasGraphUtilsV2.getTypeNameFromGuid(objectId.getGuid());
}
return typeName;
}
/**
* Check whether this vertex has a relationship associated with this relationship type.
* @param vertex
* @param relationshipTypeName
* @return true if found an edge with this relationship type in.
*/
private boolean vertexHasRelationshipWithType(AtlasVertex vertex, String relationshipTypeName) {
String relationshipEdgeLabel = getRelationshipEdgeLabel(getTypeName(vertex), relationshipTypeName);
Iterator<AtlasEdge> iter = graphHelper.getAdjacentEdgesByLabel(vertex, AtlasEdgeDirection.BOTH, relationshipEdgeLabel);
return (iter != null) ? iter.hasNext() : false;
}
private String getRelationshipEdgeLabel(String typeName, String relationshipTypeName) {
AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationshipTypeName);
AtlasRelationshipDef relationshipDef = relationshipType.getRelationshipDef();
AtlasEntityType end1Type = relationshipType.getEnd1Type();
AtlasEntityType end2Type = relationshipType.getEnd2Type();
Set<String> vertexTypes = getTypeAndAllSuperTypes(typeName);
AtlasAttribute attribute = null;
if (vertexTypes.contains(end1Type.getTypeName())) {
String attributeName = relationshipDef.getEndDef1().getName();
attribute = (attributeName != null) ? end1Type.getAttribute(attributeName) : null;
} else if (vertexTypes.contains(end2Type.getTypeName())) {
String attributeName = relationshipDef.getEndDef2().getName();
attribute = (attributeName != null) ? end2Type.getAttribute(attributeName) : null;
}
return (attribute != null) ? attribute.getRelationshipEdgeLabel() : null;
}
}