| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.atlas.repository.graph; |
| |
| import com.google.inject.Inject; |
| import com.thinkaurelius.titan.core.SchemaViolationException; |
| import com.tinkerpop.blueprints.Direction; |
| import com.tinkerpop.blueprints.Edge; |
| import com.tinkerpop.blueprints.Vertex; |
| import org.apache.atlas.AtlasException; |
| import org.apache.atlas.RequestContext; |
| import org.apache.atlas.repository.Constants; |
| import org.apache.atlas.repository.RepositoryException; |
| import org.apache.atlas.typesystem.IReferenceableInstance; |
| import org.apache.atlas.typesystem.ITypedInstance; |
| import org.apache.atlas.typesystem.ITypedReferenceableInstance; |
| import org.apache.atlas.typesystem.ITypedStruct; |
| import org.apache.atlas.typesystem.exception.EntityExistsException; |
| import org.apache.atlas.typesystem.exception.EntityNotFoundException; |
| import org.apache.atlas.typesystem.persistence.Id; |
| import org.apache.atlas.typesystem.persistence.ReferenceableInstance; |
| import org.apache.atlas.typesystem.types.AttributeInfo; |
| import org.apache.atlas.typesystem.types.ClassType; |
| import org.apache.atlas.typesystem.types.DataTypes; |
| import org.apache.atlas.typesystem.types.EnumValue; |
| import org.apache.atlas.typesystem.types.IDataType; |
| import org.apache.atlas.typesystem.types.Multiplicity; |
| import org.apache.atlas.typesystem.types.ObjectGraphWalker; |
| import org.apache.atlas.typesystem.types.TraitType; |
| import org.apache.atlas.typesystem.types.TypeSystem; |
| import org.apache.atlas.typesystem.types.TypeUtils; |
| import org.apache.atlas.utils.MD5Utils; |
| import org.apache.commons.collections.CollectionUtils; |
| import org.apache.commons.collections.MapUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.security.MessageDigest; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import static org.apache.atlas.repository.graph.GraphHelper.string; |
| |
| public final class TypedInstanceToGraphMapper { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(TypedInstanceToGraphMapper.class); |
| private final Map<Id, Vertex> idToVertexMap = new HashMap<>(); |
| private final TypeSystem typeSystem = TypeSystem.getInstance(); |
| private static final GraphHelper graphHelper = GraphHelper.getInstance(); |
| |
| private DeleteHandler deleteHandler; |
| private GraphToTypedInstanceMapper graphToTypedInstanceMapper; |
| |
| @Inject |
| public TypedInstanceToGraphMapper(GraphToTypedInstanceMapper graphToTypedInstanceMapper, DeleteHandler deleteHandler) { |
| this.graphToTypedInstanceMapper = graphToTypedInstanceMapper; |
| this.deleteHandler = deleteHandler; |
| } |
| |
| private final String SIGNATURE_HASH_PROPERTY_KEY = Constants.INTERNAL_PROPERTY_KEY_PREFIX + "signature"; |
| |
| public enum Operation { |
| CREATE, |
| UPDATE_PARTIAL, |
| UPDATE_FULL |
| } |
| |
| void mapTypedInstanceToGraph(Operation operation, ITypedReferenceableInstance... typedInstances) |
| throws AtlasException { |
| |
| RequestContext requestContext = RequestContext.get(); |
| for (ITypedReferenceableInstance typedInstance : typedInstances) { |
| LOG.debug("Adding/updating entity {}", typedInstance); |
| Collection<IReferenceableInstance> newInstances = walkClassInstances(typedInstance); |
| TypeUtils.Pair<List<ITypedReferenceableInstance>, List<ITypedReferenceableInstance>> instancesPair = |
| createVerticesAndDiscoverInstances(newInstances); |
| List<ITypedReferenceableInstance> entitiesToCreate = instancesPair.left; |
| List<ITypedReferenceableInstance> entitiesToUpdate = instancesPair.right; |
| FullTextMapper fulltextMapper = new FullTextMapper(graphToTypedInstanceMapper); |
| switch (operation) { |
| case CREATE: |
| List<String> ids = addOrUpdateAttributesAndTraits(operation, entitiesToCreate); |
| addFullTextProperty(entitiesToCreate, fulltextMapper); |
| requestContext.recordEntityCreate(ids); |
| break; |
| |
| case UPDATE_FULL: |
| case UPDATE_PARTIAL: |
| ids = addOrUpdateAttributesAndTraits(Operation.CREATE, entitiesToCreate); |
| requestContext.recordEntityCreate(ids); |
| ids = addOrUpdateAttributesAndTraits(operation, entitiesToUpdate); |
| requestContext.recordEntityUpdate(ids); |
| |
| addFullTextProperty(entitiesToCreate, fulltextMapper); |
| addFullTextProperty(entitiesToUpdate, fulltextMapper); |
| break; |
| |
| default: |
| throw new UnsupportedOperationException("Not handled - " + operation); |
| } |
| } |
| } |
| |
| private Collection<IReferenceableInstance> walkClassInstances(ITypedReferenceableInstance typedInstance) |
| throws RepositoryException { |
| |
| EntityProcessor entityProcessor = new EntityProcessor(); |
| try { |
| LOG.debug("Walking the object graph for instance {}", typedInstance.toShortString()); |
| new ObjectGraphWalker(typeSystem, entityProcessor, typedInstance).walk(); |
| } catch (AtlasException me) { |
| throw new RepositoryException("TypeSystem error when walking the ObjectGraph", me); |
| } |
| |
| entityProcessor.addInstanceIfNotExists(typedInstance); |
| return entityProcessor.getInstances(); |
| } |
| |
| private List<String> addOrUpdateAttributesAndTraits(Operation operation, List<ITypedReferenceableInstance> instances) throws AtlasException { |
| List<String> guids = new ArrayList<>(); |
| for (ITypedReferenceableInstance instance : instances) { |
| try { |
| //new vertex, set all the properties |
| String guid = addOrUpdateAttributesAndTraits(operation, instance); |
| guids.add(guid); |
| } catch (SchemaViolationException e) { |
| throw new EntityExistsException(instance, e); |
| } |
| } |
| return guids; |
| } |
| |
| private String addOrUpdateAttributesAndTraits(Operation operation, ITypedReferenceableInstance typedInstance) |
| throws AtlasException { |
| LOG.debug("Adding/Updating typed instance {}", typedInstance.toShortString()); |
| |
| Id id = typedInstance.getId(); |
| if (id == null) { // oops |
| throw new RepositoryException("id cannot be null"); |
| } |
| |
| Vertex instanceVertex = idToVertexMap.get(id); |
| |
| // add the attributes for the instance |
| ClassType classType = typeSystem.getDataType(ClassType.class, typedInstance.getTypeName()); |
| final Map<String, AttributeInfo> fields = classType.fieldMapping().fields; |
| |
| mapInstanceToVertex(typedInstance, instanceVertex, fields, false, operation); |
| |
| if (Operation.CREATE.equals(operation)) { |
| //TODO - Handle Trait updates |
| addTraits(typedInstance, instanceVertex, classType); |
| } |
| return getId(typedInstance)._getId(); |
| } |
| |
| void mapInstanceToVertex(ITypedInstance typedInstance, Vertex instanceVertex, |
| Map<String, AttributeInfo> fields, boolean mapOnlyUniqueAttributes, Operation operation) |
| throws AtlasException { |
| |
| LOG.debug("Mapping instance {} to vertex {}", typedInstance.toShortString(), string(instanceVertex)); |
| for (AttributeInfo attributeInfo : fields.values()) { |
| if (mapOnlyUniqueAttributes && !attributeInfo.isUnique) { |
| continue; |
| } |
| mapAttributeToVertex(typedInstance, instanceVertex, attributeInfo, operation); |
| } |
| GraphHelper.setProperty(instanceVertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, |
| RequestContext.get().getRequestTime()); |
| } |
| |
| void mapAttributeToVertex(ITypedInstance typedInstance, Vertex instanceVertex, |
| AttributeInfo attributeInfo, Operation operation) throws AtlasException { |
| |
| final Map<String, Object> valuesMap = typedInstance.getValuesMap(); |
| if ( typedInstance.isValueSet(attributeInfo.name) || operation == Operation.CREATE ) { |
| |
| Object attrValue = typedInstance.get(attributeInfo.name); |
| LOG.debug("Mapping attribute {} = {}", attributeInfo.name, attrValue); |
| |
| switch (attributeInfo.dataType().getTypeCategory()) { |
| case PRIMITIVE: |
| case ENUM: |
| mapPrimitiveOrEnumToVertex(typedInstance, instanceVertex, attributeInfo); |
| break; |
| |
| case ARRAY: |
| mapArrayCollectionToVertex(typedInstance, instanceVertex, attributeInfo, operation); |
| break; |
| |
| case MAP: |
| mapMapCollectionToVertex(typedInstance, instanceVertex, attributeInfo, operation); |
| break; |
| |
| case STRUCT: |
| case CLASS: |
| String edgeLabel = graphHelper.getEdgeLabel(typedInstance, attributeInfo); |
| |
| Edge currentEdge = graphHelper.getEdgeForLabel(instanceVertex, edgeLabel); |
| String newEdgeId = addOrUpdateReference(instanceVertex, attributeInfo, attributeInfo.dataType(), |
| attrValue, currentEdge, edgeLabel, operation); |
| |
| if (currentEdge != null && !currentEdge.getId().toString().equals(newEdgeId)) { |
| deleteHandler.deleteEdgeReference(currentEdge, attributeInfo.dataType().getTypeCategory(), |
| attributeInfo.isComposite, true); |
| } |
| break; |
| |
| case TRAIT: |
| // do NOTHING - this is taken care of earlier |
| break; |
| |
| default: |
| throw new IllegalArgumentException("Unknown type category: " + attributeInfo.dataType().getTypeCategory()); |
| } |
| } |
| } |
| |
| private TypeUtils.Pair<List<ITypedReferenceableInstance>, List<ITypedReferenceableInstance>> createVerticesAndDiscoverInstances( |
| Collection<IReferenceableInstance> instances) throws AtlasException { |
| |
| List<ITypedReferenceableInstance> instancesToCreate = new ArrayList<>(); |
| List<ITypedReferenceableInstance> instancesToUpdate = new ArrayList<>(); |
| |
| for (IReferenceableInstance instance : instances) { |
| LOG.debug("Discovering instance to create/update for {}", instance.toShortString()); |
| ITypedReferenceableInstance newInstance; |
| Id id = instance.getId(); |
| |
| if (!idToVertexMap.containsKey(id)) { |
| Vertex instanceVertex; |
| if (id.isAssigned()) { // has a GUID |
| LOG.debug("Instance has an assigned id {}", instance.getId()._getId()); |
| instanceVertex = graphHelper.getVertexForGUID(id.id); |
| if (!(instance instanceof ReferenceableInstance)) { |
| throw new IllegalStateException( |
| String.format("%s is not of type ITypedReferenceableInstance", instance.toShortString())); |
| } |
| newInstance = (ITypedReferenceableInstance) instance; |
| instancesToUpdate.add(newInstance); |
| |
| } else { |
| //Check if there is already an instance with the same unique attribute value |
| ClassType classType = typeSystem.getDataType(ClassType.class, instance.getTypeName()); |
| instanceVertex = graphHelper.getVertexForInstanceByUniqueAttribute(classType, instance); |
| |
| //no entity with the given unique attribute, create new |
| if (instanceVertex == null) { |
| LOG.debug("Creating new vertex for instance {}", instance.toShortString()); |
| newInstance = classType.convert(instance, Multiplicity.REQUIRED); |
| instanceVertex = graphHelper.createVertexWithIdentity(newInstance, classType.getAllSuperTypeNames()); |
| instancesToCreate.add(newInstance); |
| |
| //Map only unique attributes for cases of circular references |
| mapInstanceToVertex(newInstance, instanceVertex, classType.fieldMapping().fields, true, Operation.CREATE); |
| |
| } else { |
| LOG.debug("Re-using existing vertex {} for instance {}", string(instanceVertex), instance.toShortString()); |
| if (!(instance instanceof ReferenceableInstance)) { |
| throw new IllegalStateException( |
| String.format("%s is not of type ITypedReferenceableInstance", instance.toShortString())); |
| } |
| newInstance = (ITypedReferenceableInstance) instance; |
| instancesToUpdate.add(newInstance); |
| } |
| } |
| |
| //Set the id in the new instance |
| idToVertexMap.put(id, instanceVertex); |
| } |
| } |
| return TypeUtils.Pair.of(instancesToCreate, instancesToUpdate); |
| } |
| |
| private void addFullTextProperty(List<ITypedReferenceableInstance> instances, FullTextMapper fulltextMapper) throws AtlasException { |
| for (ITypedReferenceableInstance typedInstance : instances) { // Traverse |
| Vertex instanceVertex = getClassVertex(typedInstance); |
| String fullText = fulltextMapper.mapRecursive(instanceVertex, true); |
| GraphHelper.setProperty(instanceVertex, Constants.ENTITY_TEXT_PROPERTY_KEY, fullText); |
| } |
| } |
| |
| private void addTraits(ITypedReferenceableInstance typedInstance, Vertex instanceVertex, ClassType classType) |
| throws AtlasException { |
| for (String traitName : typedInstance.getTraits()) { |
| LOG.debug("mapping trait {}", traitName); |
| GraphHelper.addProperty(instanceVertex, Constants.TRAIT_NAMES_PROPERTY_KEY, traitName); |
| ITypedStruct traitInstance = (ITypedStruct) typedInstance.getTrait(traitName); |
| |
| // add the attributes for the trait instance |
| mapTraitInstanceToVertex(traitInstance, classType, instanceVertex); |
| } |
| } |
| |
| /******************************************** ARRAY **************************************************/ |
| |
| private void mapArrayCollectionToVertex(ITypedInstance typedInstance, Vertex instanceVertex, |
| AttributeInfo attributeInfo, Operation operation) throws AtlasException { |
| LOG.debug("Mapping instance {} for array attribute {} vertex {}", typedInstance.toShortString(), |
| attributeInfo.name, string(instanceVertex)); |
| |
| List newElements = (List) typedInstance.get(attributeInfo.name); |
| |
| String propertyName = GraphHelper.getQualifiedFieldName(typedInstance, attributeInfo); |
| List<String> currentElements = GraphHelper.getProperty(instanceVertex, propertyName); |
| IDataType elementType = ((DataTypes.ArrayType) attributeInfo.dataType()).getElemType(); |
| List<Object> newElementsCreated = new ArrayList<>(); |
| |
| if (CollectionUtils.isNotEmpty(newElements)) { |
| for (int index = 0; index < newElements.size(); index++) { |
| String currentElement = (currentElements != null && index < currentElements.size()) ? |
| currentElements.get(index) : null; |
| LOG.debug("Adding/updating element at position {}, current element {}, new element {}", index, |
| currentElement, newElements.get(index)); |
| Object newEntry = addOrUpdateCollectionEntry(instanceVertex, attributeInfo, elementType, |
| newElements.get(index), currentElement, propertyName, operation); |
| newElementsCreated.add(newEntry); |
| } |
| } |
| |
| List<String> additionalEdges = removeUnusedEntries(instanceVertex, propertyName, currentElements, |
| newElementsCreated, elementType, attributeInfo); |
| newElementsCreated.addAll(additionalEdges); |
| |
| // for dereference on way out |
| GraphHelper.setProperty(instanceVertex, propertyName, newElementsCreated); |
| } |
| |
| //Removes unused edges from the old collection, compared to the new collection |
| private List<String> removeUnusedEntries(Vertex instanceVertex, String edgeLabel, |
| Collection<String> currentEntries, |
| Collection<Object> newEntries, |
| IDataType entryType, AttributeInfo attributeInfo) throws AtlasException { |
| if (currentEntries != null && !currentEntries.isEmpty()) { |
| LOG.debug("Removing unused entries from the old collection"); |
| if (entryType.getTypeCategory() == DataTypes.TypeCategory.STRUCT |
| || entryType.getTypeCategory() == DataTypes.TypeCategory.CLASS) { |
| |
| //Remove the edges for (current edges - new edges) |
| List<String> cloneElements = new ArrayList<>(currentEntries); |
| cloneElements.removeAll(newEntries); |
| List<String> additionalElements = new ArrayList<>(); |
| LOG.debug("Removing unused entries from the old collection - {}", cloneElements); |
| |
| if (!cloneElements.isEmpty()) { |
| for (String edgeIdForDelete : cloneElements) { |
| Edge edge = graphHelper.getEdgeByEdgeId(instanceVertex, edgeLabel, edgeIdForDelete); |
| if(edge != null) { |
| boolean deleted = deleteHandler.deleteEdgeReference(edge, entryType.getTypeCategory(), |
| attributeInfo.isComposite, true); |
| if (!deleted) { |
| additionalElements.add(edgeIdForDelete); |
| } |
| } |
| } |
| } |
| return additionalElements; |
| } |
| } |
| return new ArrayList<>(); |
| } |
| |
| /******************************************** MAP **************************************************/ |
| |
| private void mapMapCollectionToVertex(ITypedInstance typedInstance, Vertex instanceVertex, |
| AttributeInfo attributeInfo, Operation operation) throws AtlasException { |
| LOG.debug("Mapping instance {} to vertex {} for attribute {}", typedInstance.toShortString(), string(instanceVertex), |
| attributeInfo.name); |
| @SuppressWarnings("unchecked") Map<Object, Object> newAttribute = |
| (Map<Object, Object>) typedInstance.get(attributeInfo.name); |
| |
| IDataType elementType = ((DataTypes.MapType) attributeInfo.dataType()).getValueType(); |
| String propertyName = GraphHelper.getQualifiedFieldName(typedInstance, attributeInfo); |
| |
| Map<String, String> currentMap = new HashMap<>(); |
| Map<String, Object> newMap = new HashMap<>(); |
| |
| List<String> currentKeys = GraphHelper.getProperty(instanceVertex, propertyName); |
| if (currentKeys != null && !currentKeys.isEmpty()) { |
| for (String key : currentKeys) { |
| String propertyNameForKey = GraphHelper.getQualifiedNameForMapKey(propertyName, key); |
| String propertyValueForKey = GraphHelper.getProperty(instanceVertex, propertyNameForKey).toString(); |
| currentMap.put(key, propertyValueForKey); |
| } |
| } |
| |
| if (MapUtils.isNotEmpty(newAttribute)) { |
| for (Map.Entry entry : newAttribute.entrySet()) { |
| String keyStr = entry.getKey().toString(); |
| String propertyNameForKey = GraphHelper.getQualifiedNameForMapKey(propertyName, keyStr); |
| |
| Object newEntry = addOrUpdateCollectionEntry(instanceVertex, attributeInfo, elementType, |
| entry.getValue(), currentMap.get(keyStr), propertyNameForKey, operation); |
| |
| //Add/Update/Remove property value |
| GraphHelper.setProperty(instanceVertex, propertyNameForKey, newEntry); |
| newMap.put(keyStr, newEntry); |
| } |
| } |
| |
| Map<String, String> additionalMap = |
| removeUnusedMapEntries(instanceVertex, propertyName, currentMap, newMap, elementType, attributeInfo); |
| |
| Set<String> newKeys = new HashSet<>(newMap.keySet()); |
| newKeys.addAll(additionalMap.keySet()); |
| |
| // for dereference on way out |
| GraphHelper.setProperty(instanceVertex, propertyName, new ArrayList<>(newKeys)); |
| } |
| |
| //Remove unused entries from map |
| private Map<String, String> removeUnusedMapEntries(Vertex instanceVertex, String propertyName, |
| Map<String, String> currentMap, |
| Map<String, Object> newMap, IDataType elementType, |
| AttributeInfo attributeInfo) |
| throws AtlasException { |
| boolean reference = (elementType.getTypeCategory() == DataTypes.TypeCategory.STRUCT |
| || elementType.getTypeCategory() == DataTypes.TypeCategory.CLASS); |
| Map<String, String> additionalMap = new HashMap<>(); |
| |
| for (String currentKey : currentMap.keySet()) { |
| boolean shouldDeleteKey = !newMap.containsKey(currentKey); |
| if (reference) { |
| String currentEdge = currentMap.get(currentKey); |
| //Delete the edge reference if its not part of new edges created/updated |
| if (!newMap.values().contains(currentEdge)) { |
| String edgeLabel = GraphHelper.getQualifiedNameForMapKey(propertyName, currentKey); |
| Edge edge = graphHelper.getEdgeByEdgeId(instanceVertex, edgeLabel, currentMap.get(currentKey)); |
| if(edge != null) { |
| boolean deleted = |
| deleteHandler.deleteEdgeReference(edge, elementType.getTypeCategory(), attributeInfo.isComposite, true); |
| if (!deleted) { |
| additionalMap.put(currentKey, currentEdge); |
| shouldDeleteKey = false; |
| } |
| } |
| } |
| } |
| |
| if (shouldDeleteKey) { |
| String propertyNameForKey = GraphHelper.getQualifiedNameForMapKey(propertyName, currentKey); |
| graphHelper.setProperty(instanceVertex, propertyNameForKey, null); |
| } |
| } |
| return additionalMap; |
| } |
| |
| /******************************************** ARRAY & MAP **************************************************/ |
| |
| private Object addOrUpdateCollectionEntry(Vertex instanceVertex, AttributeInfo attributeInfo, |
| IDataType elementType, Object newAttributeValue, String currentValue, |
| String propertyName, Operation operation) |
| throws AtlasException { |
| |
| switch (elementType.getTypeCategory()) { |
| case PRIMITIVE: |
| case ENUM: |
| return newAttributeValue != null ? newAttributeValue : null; |
| |
| case ARRAY: |
| case MAP: |
| case TRAIT: |
| // do nothing |
| return null; |
| |
| case STRUCT: |
| case CLASS: |
| final String edgeLabel = GraphHelper.EDGE_LABEL_PREFIX + propertyName; |
| Edge currentEdge = graphHelper.getEdgeByEdgeId(instanceVertex, edgeLabel, currentValue); |
| return addOrUpdateReference(instanceVertex, attributeInfo, elementType, newAttributeValue, currentEdge, |
| edgeLabel, operation); |
| |
| default: |
| throw new IllegalArgumentException("Unknown type category: " + elementType.getTypeCategory()); |
| } |
| } |
| |
| private String addOrUpdateReference(Vertex instanceVertex, AttributeInfo attributeInfo, |
| IDataType attributeType, Object newAttributeValue, Edge currentEdge, |
| String edgeLabel, Operation operation) throws AtlasException { |
| switch (attributeType.getTypeCategory()) { |
| case STRUCT: |
| return addOrUpdateStruct(instanceVertex, attributeInfo, (ITypedStruct) newAttributeValue, currentEdge, |
| edgeLabel, operation); |
| |
| case CLASS: |
| return addOrUpdateClassVertex(instanceVertex, currentEdge, |
| (ITypedReferenceableInstance) newAttributeValue, attributeInfo, edgeLabel); |
| |
| default: |
| throw new IllegalArgumentException("Unknown type category: " + attributeType.getTypeCategory()); |
| } |
| } |
| /******************************************** STRUCT **************************************************/ |
| |
| private String addOrUpdateStruct(Vertex instanceVertex, AttributeInfo attributeInfo, |
| ITypedStruct newAttributeValue, Edge currentEdge, |
| String edgeLabel, Operation operation) throws AtlasException { |
| String newEdgeId = null; |
| if (currentEdge != null && newAttributeValue != null) { |
| //update |
| updateStructVertex(newAttributeValue, currentEdge, operation); |
| newEdgeId = currentEdge.getId().toString(); |
| } else if (currentEdge == null && newAttributeValue != null) { |
| //add |
| Edge newEdge = addStructVertex(newAttributeValue, instanceVertex, attributeInfo, edgeLabel); |
| newEdgeId = newEdge.getId().toString(); |
| } |
| return newEdgeId; |
| } |
| |
| private Edge addStructVertex(ITypedStruct structInstance, Vertex instanceVertex, |
| AttributeInfo attributeInfo, String edgeLabel) throws AtlasException { |
| // add a new vertex for the struct or trait instance |
| Vertex structInstanceVertex = graphHelper.createVertexWithoutIdentity(structInstance.getTypeName(), null, |
| Collections.<String>emptySet()); // no super types for struct type |
| LOG.debug("created vertex {} for struct {} value {}", string(structInstanceVertex), attributeInfo.name, |
| structInstance.toShortString()); |
| |
| // map all the attributes to this new vertex |
| mapInstanceToVertex(structInstance, structInstanceVertex, structInstance.fieldMapping().fields, false, |
| Operation.CREATE); |
| // add an edge to the newly created vertex from the parent |
| Edge newEdge = graphHelper.getOrCreateEdge(instanceVertex, structInstanceVertex, edgeLabel); |
| |
| return newEdge; |
| } |
| |
| private void updateStructVertex(ITypedStruct newAttributeValue, Edge currentEdge, |
| Operation operation) throws AtlasException { |
| //Already existing vertex. Update |
| Vertex structInstanceVertex = currentEdge.getVertex(Direction.IN); |
| |
| LOG.debug("Updating struct vertex {} with struct {}", string(structInstanceVertex), newAttributeValue.toShortString()); |
| |
| // Update attributes |
| final MessageDigest digester = MD5Utils.getDigester(); |
| String newSignature = newAttributeValue.getSignatureHash(digester); |
| String curSignature = GraphHelper.getProperty(structInstanceVertex, SIGNATURE_HASH_PROPERTY_KEY); |
| |
| if (!newSignature.equals(curSignature)) { |
| //Update struct vertex instance only if there is a change |
| LOG.debug("Updating struct {} since signature has changed {} {} ", newAttributeValue, curSignature, newSignature); |
| mapInstanceToVertex(newAttributeValue, structInstanceVertex, newAttributeValue.fieldMapping().fields, false, operation); |
| GraphHelper.setProperty(structInstanceVertex, SIGNATURE_HASH_PROPERTY_KEY, String.valueOf(newSignature)); |
| } |
| } |
| |
| /******************************************** CLASS **************************************************/ |
| |
| private String addOrUpdateClassVertex(Vertex instanceVertex, Edge currentEdge, |
| ITypedReferenceableInstance newAttributeValue, AttributeInfo attributeInfo, |
| String edgeLabel) throws AtlasException { |
| Vertex newReferenceVertex = getClassVertex(newAttributeValue); |
| if(newReferenceVertex == null && newAttributeValue != null) { |
| LOG.error("Could not find vertex for Class Reference " + newAttributeValue); |
| throw new EntityNotFoundException("Could not find vertex for Class Reference " + newAttributeValue); |
| } |
| |
| String newEdgeId = null; |
| if (currentEdge != null && newAttributeValue != null) { |
| newEdgeId = updateClassEdge(instanceVertex, currentEdge, newAttributeValue, newReferenceVertex, |
| attributeInfo, edgeLabel); |
| } else if (currentEdge == null && newAttributeValue != null){ |
| Edge newEdge = addClassEdge(instanceVertex, newReferenceVertex, edgeLabel); |
| newEdgeId = newEdge.getId().toString(); |
| } |
| return newEdgeId; |
| } |
| |
| private Edge addClassEdge(Vertex instanceVertex, Vertex toVertex, String edgeLabel) throws AtlasException { |
| // add an edge to the class vertex from the instance |
| return graphHelper.getOrCreateEdge(instanceVertex, toVertex, edgeLabel); |
| } |
| |
| private Vertex getClassVertex(ITypedReferenceableInstance typedReference) throws EntityNotFoundException { |
| Vertex referenceVertex = null; |
| Id id = null; |
| if (typedReference != null) { |
| id = typedReference instanceof Id ? (Id) typedReference : typedReference.getId(); |
| if (id.isAssigned()) { |
| referenceVertex = graphHelper.getVertexForGUID(id.id); |
| } else { |
| referenceVertex = idToVertexMap.get(id); |
| } |
| } |
| |
| return referenceVertex; |
| } |
| |
| private Id getId(ITypedReferenceableInstance typedReference) throws EntityNotFoundException { |
| if (typedReference == null) { |
| throw new IllegalArgumentException("typedReference must be non-null"); |
| } |
| Id id = typedReference instanceof Id ? (Id) typedReference : typedReference.getId(); |
| |
| if (id.isUnassigned()) { |
| Vertex classVertex = idToVertexMap.get(id); |
| String guid = GraphHelper.getIdFromVertex(classVertex); |
| id = new Id(guid, 0, typedReference.getTypeName()); |
| } |
| return id; |
| } |
| |
| |
| private String updateClassEdge(Vertex instanceVertex, Edge currentEdge, |
| ITypedReferenceableInstance newAttributeValue, |
| Vertex newVertex, AttributeInfo attributeInfo, |
| String edgeLabel) throws AtlasException { |
| LOG.debug("Updating {} for reference attribute {}", string(currentEdge), attributeInfo.name); |
| // Update edge if it exists |
| Vertex currentVertex = currentEdge.getVertex(Direction.IN); |
| String currentEntityId = GraphHelper.getIdFromVertex(currentVertex); |
| String newEntityId = getId(newAttributeValue).id; |
| String newEdgeId = currentEdge.getId().toString(); |
| if (!currentEntityId.equals(newEntityId)) { |
| // add an edge to the class vertex from the instance |
| if (newVertex != null) { |
| Edge newEdge = graphHelper.getOrCreateEdge(instanceVertex, newVertex, edgeLabel); |
| newEdgeId = newEdge.getId().toString(); |
| } |
| } |
| |
| return newEdgeId; |
| } |
| |
| /******************************************** TRAITS ****************************************************/ |
| |
| void mapTraitInstanceToVertex(ITypedStruct traitInstance, IDataType entityType, Vertex parentInstanceVertex) |
| throws AtlasException { |
| // add a new vertex for the struct or trait instance |
| final String traitName = traitInstance.getTypeName(); |
| Vertex traitInstanceVertex = graphHelper.createVertexWithoutIdentity(traitInstance.getTypeName(), null, |
| typeSystem.getDataType(TraitType.class, traitName).getAllSuperTypeNames()); |
| LOG.debug("created vertex {} for trait {}", string(traitInstanceVertex), traitName); |
| |
| // map all the attributes to this newly created vertex |
| mapInstanceToVertex(traitInstance, traitInstanceVertex, traitInstance.fieldMapping().fields, false, Operation.CREATE); |
| |
| // add an edge to the newly created vertex from the parent |
| String relationshipLabel = GraphHelper.getTraitLabel(entityType.getName(), traitName); |
| graphHelper.getOrCreateEdge(parentInstanceVertex, traitInstanceVertex, relationshipLabel); |
| } |
| |
| /******************************************** PRIMITIVES **************************************************/ |
| |
| private void mapPrimitiveOrEnumToVertex(ITypedInstance typedInstance, Vertex instanceVertex, |
| AttributeInfo attributeInfo) throws AtlasException { |
| Object attrValue = typedInstance.get(attributeInfo.name); |
| |
| final String vertexPropertyName = GraphHelper.getQualifiedFieldName(typedInstance, attributeInfo); |
| Object propertyValue = null; |
| |
| if (attrValue == null) { |
| propertyValue = null; |
| } else if (attributeInfo.dataType() == DataTypes.STRING_TYPE) { |
| propertyValue = typedInstance.getString(attributeInfo.name); |
| } else if (attributeInfo.dataType() == DataTypes.SHORT_TYPE) { |
| propertyValue = typedInstance.getShort(attributeInfo.name); |
| } else if (attributeInfo.dataType() == DataTypes.INT_TYPE) { |
| propertyValue = typedInstance.getInt(attributeInfo.name); |
| } else if (attributeInfo.dataType() == DataTypes.BIGINTEGER_TYPE) { |
| propertyValue = typedInstance.getBigInt(attributeInfo.name); |
| } else if (attributeInfo.dataType() == DataTypes.BOOLEAN_TYPE) { |
| propertyValue = typedInstance.getBoolean(attributeInfo.name); |
| } else if (attributeInfo.dataType() == DataTypes.BYTE_TYPE) { |
| propertyValue = typedInstance.getByte(attributeInfo.name); |
| } else if (attributeInfo.dataType() == DataTypes.LONG_TYPE) { |
| propertyValue = typedInstance.getLong(attributeInfo.name); |
| } else if (attributeInfo.dataType() == DataTypes.FLOAT_TYPE) { |
| propertyValue = typedInstance.getFloat(attributeInfo.name); |
| } else if (attributeInfo.dataType() == DataTypes.DOUBLE_TYPE) { |
| propertyValue = typedInstance.getDouble(attributeInfo.name); |
| } else if (attributeInfo.dataType() == DataTypes.BIGDECIMAL_TYPE) { |
| propertyValue = typedInstance.getBigDecimal(attributeInfo.name); |
| } else if (attributeInfo.dataType() == DataTypes.DATE_TYPE) { |
| final Date dateVal = typedInstance.getDate(attributeInfo.name); |
| //Convert Property value to Long while persisting |
| if (dateVal != null) { |
| propertyValue = dateVal.getTime(); |
| } |
| } else if (attributeInfo.dataType().getTypeCategory() == DataTypes.TypeCategory.ENUM) { |
| if (attrValue != null) { |
| propertyValue = ((EnumValue) attrValue).value; |
| } |
| } |
| |
| GraphHelper.setProperty(instanceVertex, vertexPropertyName, propertyValue); |
| } |
| } |