| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.atlas.repository.store.graph.v1; |
| |
| import org.apache.atlas.AtlasErrorCode; |
| import org.apache.atlas.annotation.GraphTransaction; |
| import org.apache.atlas.exception.AtlasBaseException; |
| import org.apache.atlas.model.TypeCategory; |
| import org.apache.atlas.model.instance.AtlasObjectId; |
| import org.apache.atlas.model.instance.AtlasRelationship; |
| import org.apache.atlas.model.typedef.AtlasRelationshipEndDef; |
| import org.apache.atlas.repository.Constants; |
| import org.apache.atlas.repository.RepositoryException; |
| import org.apache.atlas.repository.graph.GraphHelper; |
| import org.apache.atlas.repository.graphdb.AtlasEdge; |
| import org.apache.atlas.repository.graphdb.AtlasVertex; |
| import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; |
| import org.apache.atlas.type.AtlasEntityType; |
| import org.apache.atlas.type.AtlasRelationshipType; |
| import org.apache.atlas.type.AtlasStructType.AtlasAttribute; |
| import org.apache.atlas.type.AtlasType; |
| import org.apache.atlas.type.AtlasTypeRegistry; |
| import org.apache.atlas.type.AtlasTypeUtil; |
| import org.apache.atlas.typesystem.exception.EntityNotFoundException; |
| import org.apache.commons.collections.MapUtils; |
| import org.apache.commons.lang.StringUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.stereotype.Component; |
| |
| import javax.inject.Inject; |
| import java.util.ArrayList; |
| import java.util.Date; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| |
| @Component |
| public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore { |
| private static final Logger LOG = LoggerFactory.getLogger(AtlasRelationshipStoreV1.class); |
| private static final int DEFAULT_RELATIONSHIP_VERSION = 0; |
| |
| private final AtlasTypeRegistry typeRegistry; |
| private final EntityGraphRetriever entityRetriever; |
| private final GraphHelper graphHelper = GraphHelper.getInstance(); |
| |
| @Inject |
| public AtlasRelationshipStoreV1(AtlasTypeRegistry typeRegistry) { |
| this.typeRegistry = typeRegistry; |
| this.entityRetriever = new EntityGraphRetriever(typeRegistry); |
| } |
| |
| @Override |
| @GraphTransaction |
| public AtlasRelationship create(AtlasRelationship relationship) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("==> create({})", relationship); |
| } |
| |
| validateRelationship(relationship); |
| |
| AtlasVertex end1Vertex = getVertexFromEndPoint(relationship.getEnd1()); |
| AtlasVertex end2Vertex = getVertexFromEndPoint(relationship.getEnd2()); |
| AtlasRelationship ret; |
| |
| // create relationship between two vertex |
| try { |
| AtlasEdge relationshipEdge = getRelationshipEdge(end1Vertex, end2Vertex, relationship); |
| |
| if (relationshipEdge == null) { |
| relationshipEdge = createRelationshipEdge(end1Vertex, end2Vertex, relationship); |
| |
| AtlasRelationshipType relationType = typeRegistry.getRelationshipTypeByName(relationship.getTypeName()); |
| |
| if (MapUtils.isNotEmpty(relationType.getAllAttributes())) { |
| for (AtlasAttribute attr : relationType.getAllAttributes().values()) { |
| String attrName = attr.getName(); |
| Object attrValue = relationship.getAttribute(attrName); |
| |
| AtlasGraphUtilsV1.setProperty(relationshipEdge, attr.getVertexPropertyName(), attrValue); |
| } |
| } |
| |
| ret = mapEdgeToAtlasRelationship(relationshipEdge); |
| |
| } else { |
| throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_ALREADY_EXISTS, relationship.getTypeName(), |
| relationship.getEnd1().getGuid(), relationship.getEnd2().getGuid()); |
| } |
| } catch (RepositoryException e) { |
| throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("<== create({}): {}", relationship, ret); |
| } |
| |
| return ret; |
| } |
| |
| @Override |
| @GraphTransaction |
| public AtlasRelationship getOrCreate(AtlasRelationship relationship) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("==> getOrCreate({})", relationship); |
| } |
| |
| validateRelationship(relationship); |
| |
| AtlasVertex end1Vertex = getVertexFromEndPoint(relationship.getEnd1()); |
| AtlasVertex end2Vertex = getVertexFromEndPoint(relationship.getEnd2()); |
| AtlasRelationship ret; |
| |
| // check if relationship exists |
| AtlasEdge relationshipEdge = getRelationshipEdge(end1Vertex, end2Vertex, relationship); |
| |
| ret = (relationshipEdge != null) ? mapEdgeToAtlasRelationship(relationshipEdge) : create(relationship); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("<== getOrCreate({}): {}", relationship, ret); |
| } |
| |
| return ret; |
| } |
| |
| @Override |
| @GraphTransaction |
| public AtlasRelationship update(AtlasRelationship relationship) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("==> update({})", relationship); |
| } |
| |
| AtlasRelationship ret = null; |
| |
| // TODO: update(relationship) implementation |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("<== update({}): {}", relationship, ret); |
| } |
| |
| return ret; |
| } |
| |
| @Override |
| @GraphTransaction |
| public AtlasRelationship getById(String guid) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("==> getById({})", guid); |
| } |
| |
| AtlasRelationship ret; |
| |
| try { |
| AtlasEdge edge = graphHelper.getEdgeForGUID(guid); |
| |
| ret = mapEdgeToAtlasRelationship(edge); |
| } catch (EntityNotFoundException ex) { |
| throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_GUID_NOT_FOUND, guid); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("<== getById({}): {}", guid, ret); |
| } |
| |
| return ret; |
| } |
| |
| @Override |
| @GraphTransaction |
| public void deleteById(String guid) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("==> deleteById({})", guid); |
| } |
| |
| // TODO: deleteById(guid) implementation |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("<== deleteById({}): {}", guid); |
| } |
| } |
| |
| private void validateRelationship(AtlasRelationship relationship) throws AtlasBaseException { |
| if (relationship == null) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "AtlasRelationship is null"); |
| } |
| |
| String relationshipName = relationship.getTypeName(); |
| String end1TypeName = getTypeNameFromObjectId(relationship.getEnd1()); |
| String end2TypeName = getTypeNameFromObjectId(relationship.getEnd2()); |
| AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationshipName); |
| |
| if (relationshipType == null) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "unknown relationship type'" + relationshipName + "'"); |
| } |
| |
| if (relationship.getEnd1() == null || relationship.getEnd2() == null) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "end1/end2 is null"); |
| } |
| |
| if (!relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end1TypeName) && |
| !relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end1TypeName)) { |
| |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_END_TYPE, relationshipName, |
| relationshipType.getEnd2Type().getTypeName(), end1TypeName); |
| } |
| |
| if (!relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end2TypeName) && |
| !relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end2TypeName)) { |
| |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_END_TYPE, relationshipName, |
| relationshipType.getEnd1Type().getTypeName(), end2TypeName); |
| } |
| |
| validateEnd(relationship.getEnd1()); |
| validateEnd(relationship.getEnd2()); |
| |
| validateAndNormalize(relationship); |
| } |
| |
| private void validateEnd(AtlasObjectId end) throws AtlasBaseException { |
| String guid = end.getGuid(); |
| String typeName = end.getTypeName(); |
| Map<String, Object> uniqueAttributes = end.getUniqueAttributes(); |
| AtlasVertex endVertex = AtlasGraphUtilsV1.findByGuid(guid); |
| |
| if (!AtlasTypeUtil.isValidGuid(guid) || endVertex == null) { |
| throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); |
| } else if (MapUtils.isNotEmpty(uniqueAttributes)) { |
| AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); |
| |
| if (AtlasGraphUtilsV1.findByUniqueAttributes(entityType, uniqueAttributes) == null) { |
| throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, typeName, uniqueAttributes.toString()); |
| } |
| } |
| } |
| |
| private void validateAndNormalize(AtlasRelationship relationship) throws AtlasBaseException { |
| List<String> messages = new ArrayList<>(); |
| |
| if (! AtlasTypeUtil.isValidGuid(relationship.getGuid())) { |
| throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_GUID_NOT_FOUND, relationship.getGuid()); |
| } |
| |
| AtlasRelationshipType type = typeRegistry.getRelationshipTypeByName(relationship.getTypeName()); |
| |
| if (type == null) { |
| throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.RELATIONSHIP.name(), relationship.getTypeName()); |
| } |
| |
| type.validateValue(relationship, relationship.getTypeName(), messages); |
| |
| if (!messages.isEmpty()) { |
| throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_CRUD_INVALID_PARAMS, messages); |
| } |
| |
| type.getNormalizedValue(relationship); |
| } |
| |
| private AtlasEdge getRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasRelationship relationship) { |
| String relationshipLabel = getRelationshipEdgeLabel(fromVertex, toVertex, relationship); |
| AtlasEdge ret = graphHelper.getEdgeForLabel(fromVertex, relationshipLabel); |
| |
| if (ret != null) { |
| AtlasVertex inVertex = ret.getInVertex(); |
| |
| if (inVertex != null) { |
| if (!StringUtils.equals(AtlasGraphUtilsV1.getIdFromVertex(inVertex), |
| AtlasGraphUtilsV1.getIdFromVertex(toVertex))) { |
| ret = null; |
| } |
| } |
| } |
| |
| return ret; |
| } |
| |
| private int getRelationshipVersion(AtlasRelationship relationship) { |
| Long ret = relationship != null ? relationship.getVersion() : null; |
| |
| return (ret != null) ? ret.intValue() : DEFAULT_RELATIONSHIP_VERSION; |
| } |
| |
| private AtlasVertex getVertexFromEndPoint(AtlasObjectId endPoint) { |
| AtlasVertex ret = null; |
| |
| if (StringUtils.isNotEmpty(endPoint.getGuid())) { |
| ret = AtlasGraphUtilsV1.findByGuid(endPoint.getGuid()); |
| |
| } else if (StringUtils.isNotEmpty(endPoint.getTypeName()) && MapUtils.isNotEmpty(endPoint.getUniqueAttributes())) { |
| AtlasEntityType entityType = typeRegistry.getEntityTypeByName(endPoint.getTypeName()); |
| |
| ret = AtlasGraphUtilsV1.findByUniqueAttributes(entityType, endPoint.getUniqueAttributes()); |
| } |
| |
| return ret; |
| } |
| |
| private AtlasEdge createRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasRelationship relationship) |
| throws RepositoryException { |
| |
| String relationshipLabel = getRelationshipEdgeLabel(fromVertex, toVertex, relationship); |
| AtlasEdge ret = graphHelper.getOrCreateEdge(fromVertex, toVertex, relationshipLabel); |
| |
| // map additional properties to relationship edge |
| if (ret != null) { |
| final String guid = UUID.randomUUID().toString(); |
| |
| AtlasGraphUtilsV1.setProperty(ret, Constants.ENTITY_TYPE_PROPERTY_KEY, relationship.getTypeName()); |
| AtlasGraphUtilsV1.setProperty(ret, Constants.GUID_PROPERTY_KEY, guid); |
| AtlasGraphUtilsV1.setProperty(ret, Constants.VERSION_PROPERTY_KEY, getRelationshipVersion(relationship)); |
| } |
| |
| return ret; |
| } |
| |
| private String getRelationshipEdgeLabel(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasRelationship relationship) { |
| |
| String ret = relationship.getRelationshipLabel(); |
| AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationship.getTypeName()); |
| AtlasRelationshipEndDef endDef1 = relationshipType.getRelationshipDef().getEndDef1(); |
| AtlasRelationshipEndDef endDef2 = relationshipType.getRelationshipDef().getEndDef2(); |
| Set<String> fromVertexTypes = getTypeAndAllSuperTypes(AtlasGraphUtilsV1.getTypeName(fromVertex)); |
| Set<String> toVertexTypes = getTypeAndAllSuperTypes(AtlasGraphUtilsV1.getTypeName(toVertex)); |
| |
| // validate entity type and all its supertypes contains relationshipDefs end type |
| // e.g. [ hive_process -> hive_table] -> [ Process -> DataSet ] |
| if (fromVertexTypes.contains(endDef1.getType()) && toVertexTypes.contains(endDef2.getType())) { |
| String attributeName = endDef1.getName(); |
| AtlasAttribute endAttribute = relationshipType.getEnd1Type().getAttribute(attributeName); |
| |
| if (endAttribute != null) { |
| ret = endAttribute.getRelationshipEdgeLabel(); |
| } |
| |
| } else if (fromVertexTypes.contains(endDef2.getType()) && toVertexTypes.contains(endDef1.getType())) { |
| String attributeName = endDef2.getName(); |
| AtlasAttribute endAttribute = relationshipType.getEnd2Type().getAttribute(attributeName); |
| |
| if (endAttribute != null) { |
| ret = endAttribute.getRelationshipEdgeLabel(); |
| } |
| } |
| |
| return ret; |
| } |
| |
| public Set<String> getTypeAndAllSuperTypes(String entityTypeName) { |
| AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName); |
| |
| return (entityType != null) ? entityType.getTypeAndAllSuperTypes() : new HashSet<String>(); |
| } |
| |
| private AtlasRelationship mapEdgeToAtlasRelationship(AtlasEdge edge) throws AtlasBaseException { |
| AtlasRelationship ret = new AtlasRelationship(); |
| |
| mapSystemAttributes(edge, ret); |
| |
| mapAttributes(edge, ret); |
| |
| return ret; |
| } |
| |
| private AtlasRelationship mapSystemAttributes(AtlasEdge edge, AtlasRelationship relationship) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Mapping system attributes for relationship"); |
| } |
| |
| relationship.setGuid(GraphHelper.getGuid(edge)); |
| relationship.setTypeName(GraphHelper.getTypeName(edge)); |
| |
| relationship.setCreatedBy(GraphHelper.getCreatedByAsString(edge)); |
| relationship.setUpdatedBy(GraphHelper.getModifiedByAsString(edge)); |
| |
| relationship.setCreateTime(new Date(GraphHelper.getCreatedTime(edge))); |
| relationship.setUpdateTime(new Date(GraphHelper.getModifiedTime(edge))); |
| |
| relationship.setVersion(GraphHelper.getVersion(edge).longValue()); |
| relationship.setStatus(GraphHelper.getEdgeStatus(edge)); |
| |
| AtlasVertex end1Vertex = edge.getOutVertex(); |
| AtlasVertex end2Vertex = edge.getInVertex(); |
| |
| relationship.setEnd1(new AtlasObjectId(GraphHelper.getGuid(end1Vertex), GraphHelper.getTypeName(end1Vertex))); |
| relationship.setEnd2(new AtlasObjectId(GraphHelper.getGuid(end2Vertex), GraphHelper.getTypeName(end2Vertex))); |
| |
| relationship.setLabel(edge.getLabel()); |
| |
| return relationship; |
| } |
| |
| private void mapAttributes(AtlasEdge edge, AtlasRelationship relationship) throws AtlasBaseException { |
| AtlasType objType = typeRegistry.getType(relationship.getTypeName()); |
| |
| if (!(objType instanceof AtlasRelationshipType)) { |
| throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, relationship.getTypeName()); |
| } |
| |
| AtlasRelationshipType relationshipType = (AtlasRelationshipType) objType; |
| |
| for (AtlasAttribute attribute : relationshipType.getAllAttributes().values()) { |
| // mapping only primitive attributes |
| Object attrValue = entityRetriever.mapVertexToPrimitive(edge, attribute.getQualifiedName(), |
| attribute.getAttributeDef()); |
| |
| relationship.setAttribute(attribute.getName(), attrValue); |
| } |
| } |
| |
| private String getTypeNameFromObjectId(AtlasObjectId objectId) { |
| String typeName = objectId.getTypeName(); |
| |
| if (StringUtils.isBlank(typeName)) { |
| typeName = AtlasGraphUtilsV1.getTypeNameFromGuid(objectId.getGuid()); |
| } |
| |
| return typeName; |
| } |
| } |