| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.atlas.repository.store.graph.v2; |
| |
| |
| import org.apache.atlas.AtlasErrorCode; |
| import org.apache.atlas.GraphTransactionInterceptor; |
| import org.apache.atlas.RequestContext; |
| import org.apache.atlas.annotation.GraphTransaction; |
| import org.apache.atlas.authorize.AtlasAdminAccessRequest; |
| import org.apache.atlas.authorize.AtlasAuthorizationUtils; |
| import org.apache.atlas.authorize.AtlasEntityAccessRequest; |
| import org.apache.atlas.authorize.AtlasEntityAccessRequest.AtlasEntityAccessRequestBuilder; |
| import org.apache.atlas.authorize.AtlasPrivilege; |
| import org.apache.atlas.exception.AtlasBaseException; |
| import org.apache.atlas.model.TypeCategory; |
| import org.apache.atlas.model.instance.AtlasCheckStateRequest; |
| import org.apache.atlas.model.instance.AtlasCheckStateResult; |
| import org.apache.atlas.model.instance.AtlasClassification; |
| import org.apache.atlas.model.instance.AtlasEntity; |
| import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; |
| import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; |
| import org.apache.atlas.model.instance.AtlasEntity.Status; |
| import org.apache.atlas.model.instance.AtlasEntityHeader; |
| import org.apache.atlas.model.instance.AtlasEntityHeaders; |
| import org.apache.atlas.model.instance.AtlasObjectId; |
| import org.apache.atlas.model.instance.EntityMutationResponse; |
| import org.apache.atlas.model.typedef.AtlasBaseTypeDef; |
| import org.apache.atlas.repository.graph.GraphHelper; |
| import org.apache.atlas.repository.graphdb.AtlasGraph; |
| import org.apache.atlas.repository.graphdb.AtlasVertex; |
| import org.apache.atlas.repository.store.graph.AtlasEntityStore; |
| import org.apache.atlas.repository.store.graph.EntityGraphDiscovery; |
| import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; |
| import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate; |
| import org.apache.atlas.store.DeleteType; |
| import org.apache.atlas.type.AtlasArrayType; |
| import org.apache.atlas.type.AtlasBusinessMetadataType.AtlasBusinessAttribute; |
| import org.apache.atlas.type.AtlasClassificationType; |
| import org.apache.atlas.type.AtlasEntityType; |
| import org.apache.atlas.type.AtlasEnumType; |
| import org.apache.atlas.type.AtlasStructType.AtlasAttribute; |
| import org.apache.atlas.type.AtlasType; |
| import org.apache.atlas.type.AtlasTypeRegistry; |
| import org.apache.atlas.type.AtlasTypeUtil; |
| import org.apache.atlas.bulkimport.BulkImportResponse; |
| import org.apache.atlas.util.FileUtils; |
| import org.apache.atlas.utils.AtlasEntityUtil; |
| import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder; |
| import org.apache.atlas.utils.AtlasPerfTracer; |
| import org.apache.commons.collections.CollectionUtils; |
| import org.apache.commons.collections.MapUtils; |
| import org.apache.commons.lang3.StringUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.stereotype.Component; |
| |
| import javax.inject.Inject; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| |
| import static java.lang.Boolean.FALSE; |
| import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE; |
| import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PURGE; |
| import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE; |
| import static org.apache.atlas.repository.Constants.IS_INCOMPLETE_PROPERTY_KEY; |
| import static org.apache.atlas.repository.graph.GraphHelper.getCustomAttributes; |
| import static org.apache.atlas.repository.graph.GraphHelper.getTypeName; |
| import static org.apache.atlas.repository.graph.GraphHelper.isEntityIncomplete; |
| import static org.apache.atlas.repository.store.graph.v2.EntityGraphMapper.validateLabels; |
| |
| |
| @Component |
| public class AtlasEntityStoreV2 implements AtlasEntityStore { |
| private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class); |
| private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("store.EntityStore"); |
| |
| private final AtlasGraph graph; |
| private final DeleteHandlerDelegate deleteDelegate; |
| private final AtlasTypeRegistry typeRegistry; |
| private final IAtlasEntityChangeNotifier entityChangeNotifier; |
| private final EntityGraphMapper entityGraphMapper; |
| private final EntityGraphRetriever entityRetriever; |
| |
| @Inject |
| public AtlasEntityStoreV2(AtlasGraph graph, DeleteHandlerDelegate deleteDelegate, AtlasTypeRegistry typeRegistry, |
| IAtlasEntityChangeNotifier entityChangeNotifier, EntityGraphMapper entityGraphMapper) { |
| this.graph = graph; |
| this.deleteDelegate = deleteDelegate; |
| this.typeRegistry = typeRegistry; |
| this.entityChangeNotifier = entityChangeNotifier; |
| this.entityGraphMapper = entityGraphMapper; |
| this.entityRetriever = new EntityGraphRetriever(graph, typeRegistry); |
| } |
| |
| @Override |
| @GraphTransaction |
| public List<String> getEntityGUIDS(final String typename) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("==> getEntityGUIDS({})", typename); |
| } |
| |
| if (StringUtils.isEmpty(typename) || !typeRegistry.isRegisteredType(typename)) { |
| throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME); |
| } |
| |
| List<String> ret = AtlasGraphUtilsV2.findEntityGUIDsByType(graph, typename); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("<== getEntityGUIDS({})", typename); |
| } |
| |
| return ret; |
| } |
| |
| @Override |
| @GraphTransaction |
| public AtlasEntityWithExtInfo getById(String guid) throws AtlasBaseException { |
| return getById(guid, false, false); |
| } |
| |
| @Override |
| @GraphTransaction |
| public AtlasEntityWithExtInfo getById(final String guid, final boolean isMinExtInfo, boolean ignoreRelationships) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("==> getById({}, {})", guid, isMinExtInfo); |
| } |
| |
| EntityGraphRetriever entityRetriever = new EntityGraphRetriever(graph, typeRegistry, ignoreRelationships); |
| |
| AtlasEntityWithExtInfo ret = entityRetriever.toAtlasEntityWithExtInfo(guid, isMinExtInfo); |
| |
| if (ret == null) { |
| throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); |
| } |
| |
| AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ, new AtlasEntityHeader(ret.getEntity())), "read entity: guid=", guid); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("<== getById({}, {}): {}", guid, isMinExtInfo, ret); |
| } |
| |
| return ret; |
| } |
| |
| @Override |
| @GraphTransaction |
| public AtlasEntityHeader getHeaderById(final String guid) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("==> getHeaderById({})", guid); |
| } |
| |
| EntityGraphRetriever entityRetriever = new EntityGraphRetriever(graph, typeRegistry); |
| |
| AtlasEntityHeader ret = entityRetriever.toAtlasEntityHeader(guid); |
| |
| if (ret == null) { |
| throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); |
| } |
| |
| AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ, ret), "read entity: guid=", guid); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("<== getHeaderById({}): {}", guid, ret); |
| } |
| |
| return ret; |
| } |
| |
| @Override |
| @GraphTransaction |
| public AtlasEntitiesWithExtInfo getByIds(List<String> guids) throws AtlasBaseException { |
| return getByIds(guids, false, false); |
| } |
| |
| @Override |
| @GraphTransaction |
| public AtlasEntitiesWithExtInfo getByIds(List<String> guids, boolean isMinExtInfo, boolean ignoreRelationships) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("==> getByIds({}, {})", guids, isMinExtInfo); |
| } |
| |
| EntityGraphRetriever entityRetriever = new EntityGraphRetriever(graph, typeRegistry, ignoreRelationships); |
| |
| AtlasEntitiesWithExtInfo ret = entityRetriever.toAtlasEntitiesWithExtInfo(guids, isMinExtInfo); |
| |
| if(ret != null){ |
| for(String guid : guids){ |
| AtlasEntity entity = ret.getEntity(guid); |
| |
| AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ, new AtlasEntityHeader(entity)), "read entity: guid=", guid); |
| } |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("<== getByIds({}, {}): {}", guids, isMinExtInfo, ret); |
| } |
| |
| return ret; |
| } |
| |
| @Override |
| @GraphTransaction |
| public AtlasEntitiesWithExtInfo getEntitiesByUniqueAttributes(AtlasEntityType entityType, List<Map<String, Object>> uniqueAttributes , boolean isMinExtInfo, boolean ignoreRelationships) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("==> getEntitiesByUniqueAttributes({}, {})", entityType.getTypeName(), uniqueAttributes); |
| } |
| |
| EntityGraphRetriever entityRetriever = new EntityGraphRetriever(graph, typeRegistry, ignoreRelationships); |
| |
| AtlasEntitiesWithExtInfo ret = entityRetriever.getEntitiesByUniqueAttributes(entityType.getTypeName(), uniqueAttributes, isMinExtInfo); |
| |
| if (ret != null && ret.getEntities() != null) { |
| for (AtlasEntity entity : ret.getEntities()) { |
| AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ, new AtlasEntityHeader(entity)), "read entity: typeName=", entityType.getTypeName(), ", guid=", entity.getGuid()); |
| } |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("<== getEntitiesByUniqueAttributes({}, {}): {}", entityType.getTypeName(), uniqueAttributes, ret); |
| } |
| |
| return ret; |
| } |
| |
| @Override |
| @GraphTransaction |
| public AtlasEntityWithExtInfo getByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) |
| throws AtlasBaseException { |
| return getByUniqueAttributes(entityType, uniqAttributes, false, false); |
| } |
| |
| @Override |
| @GraphTransaction |
| public AtlasEntityWithExtInfo getByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes, boolean isMinExtInfo, boolean ignoreRelationships) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("==> getByUniqueAttribute({}, {})", entityType.getTypeName(), uniqAttributes); |
| } |
| |
| AtlasVertex entityVertex = AtlasGraphUtilsV2.getVertexByUniqueAttributes(graph, entityType, uniqAttributes); |
| |
| EntityGraphRetriever entityRetriever = new EntityGraphRetriever(graph, typeRegistry, ignoreRelationships); |
| |
| AtlasEntityWithExtInfo ret = entityRetriever.toAtlasEntityWithExtInfo(entityVertex, isMinExtInfo); |
| |
| if (ret == null) { |
| throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, entityType.getTypeName(), |
| uniqAttributes.toString()); |
| } |
| |
| AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ, new AtlasEntityHeader(ret.getEntity())), "read entity: typeName=", entityType.getTypeName(), ", uniqueAttributes=", uniqAttributes); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("<== getByUniqueAttribute({}, {}): {}", entityType.getTypeName(), uniqAttributes, ret); |
| } |
| |
| return ret; |
| } |
| |
| @Override |
| @GraphTransaction |
| public AtlasEntityHeader getEntityHeaderByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("==> getEntityHeaderByUniqueAttributes({}, {})", entityType.getTypeName(), uniqAttributes); |
| } |
| |
| AtlasVertex entityVertex = AtlasGraphUtilsV2.getVertexByUniqueAttributes(graph, entityType, uniqAttributes); |
| |
| EntityGraphRetriever entityRetriever = new EntityGraphRetriever(graph, typeRegistry); |
| |
| AtlasEntityHeader ret = entityRetriever.toAtlasEntityHeader(entityVertex); |
| |
| if (ret == null) { |
| throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, entityType.getTypeName(), |
| uniqAttributes.toString()); |
| } |
| |
| AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ, ret), "read entity: typeName=", entityType.getTypeName(), ", uniqueAttributes=", uniqAttributes); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("<== getEntityHeaderByUniqueAttributes({}, {}): {}", entityType.getTypeName(), uniqAttributes, ret); |
| } |
| |
| return ret; |
| } |
| |
| /** |
| * Check state of entities in the store |
| * @param request AtlasCheckStateRequest |
| * @return AtlasCheckStateResult |
| * @throws AtlasBaseException |
| */ |
| @Override |
| @GraphTransaction |
| public AtlasCheckStateResult checkState(AtlasCheckStateRequest request) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("==> checkState({})", request); |
| } |
| |
| EntityStateChecker entityStateChecker = new EntityStateChecker(graph, typeRegistry); |
| |
| AtlasCheckStateResult ret = entityStateChecker.checkState(request); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("<== checkState({}, {})", request, ret); |
| } |
| |
| return ret; |
| } |
| |
| @Override |
| @GraphTransaction |
| public EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate) throws AtlasBaseException { |
| return createOrUpdate(entityStream, isPartialUpdate, false, false); |
| } |
| |
| @Override |
| @GraphTransaction(logRollback = false) |
| public EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException { |
| return createOrUpdate(entityStream, false, true, true); |
| } |
| |
| @Override |
| public EntityMutationResponse createOrUpdateForImportNoCommit(EntityStream entityStream) throws AtlasBaseException { |
| return createOrUpdate(entityStream, false, true, true); |
| } |
| |
| @Override |
| @GraphTransaction |
| public EntityMutationResponse updateEntity(AtlasObjectId objectId, AtlasEntityWithExtInfo updatedEntityInfo, boolean isPartialUpdate) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("==> updateEntity({}, {}, {})", objectId, updatedEntityInfo, isPartialUpdate); |
| } |
| |
| if (objectId == null || updatedEntityInfo == null || updatedEntityInfo.getEntity() == null) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "null entity-id/entity"); |
| } |
| |
| final String guid; |
| |
| if (AtlasTypeUtil.isAssignedGuid(objectId.getGuid())) { |
| guid = objectId.getGuid(); |
| } else { |
| AtlasEntityType entityType = typeRegistry.getEntityTypeByName(objectId.getTypeName()); |
| |
| if (entityType == null) { |
| throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, objectId.getTypeName()); |
| } |
| |
| guid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(graph, typeRegistry.getEntityTypeByName(objectId.getTypeName()), objectId.getUniqueAttributes()); |
| } |
| |
| AtlasEntity entity = updatedEntityInfo.getEntity(); |
| |
| entity.setGuid(guid); |
| |
| return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), isPartialUpdate, false, false); |
| } |
| |
| @Override |
| @GraphTransaction |
| public EntityMutationResponse updateByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes, |
| AtlasEntityWithExtInfo updatedEntityInfo) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("==> updateByUniqueAttributes({}, {})", entityType.getTypeName(), uniqAttributes); |
| } |
| |
| if (updatedEntityInfo == null || updatedEntityInfo.getEntity() == null) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entity to update."); |
| } |
| |
| String guid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(graph, entityType, uniqAttributes); |
| AtlasEntity entity = updatedEntityInfo.getEntity(); |
| |
| entity.setGuid(guid); |
| |
| AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, new AtlasEntityHeader(entity)), "update entity ByUniqueAttributes"); |
| |
| return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), true, false, false); |
| } |
| |
| @Override |
| @GraphTransaction |
| public EntityMutationResponse updateEntityAttributeByGuid(String guid, String attrName, Object attrValue) |
| throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("==> updateEntityAttributeByGuid({}, {}, {})", guid, attrName, attrValue); |
| } |
| |
| AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); |
| AtlasEntityType entityType = (AtlasEntityType) typeRegistry.getType(entity.getTypeName()); |
| AtlasAttribute attr = entityType.getAttribute(attrName); |
| |
| AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, entity), "update entity ByUniqueAttributes : guid=", guid ); |
| |
| if (attr == null) { |
| attr = entityType.getRelationshipAttribute(attrName, AtlasEntityUtil.getRelationshipType(attrValue)); |
| |
| if (attr == null) { |
| throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_ATTRIBUTE, attrName, entity.getTypeName()); |
| } |
| } |
| |
| AtlasType attrType = attr.getAttributeType(); |
| AtlasEntity updateEntity = new AtlasEntity(); |
| |
| updateEntity.setGuid(guid); |
| updateEntity.setTypeName(entity.getTypeName()); |
| |
| switch (attrType.getTypeCategory()) { |
| case PRIMITIVE: |
| updateEntity.setAttribute(attrName, attrValue); |
| break; |
| case OBJECT_ID_TYPE: |
| AtlasObjectId objId; |
| |
| if (attrValue instanceof String) { |
| objId = new AtlasObjectId((String) attrValue, attr.getAttributeDef().getTypeName()); |
| } else { |
| objId = (AtlasObjectId) attrType.getNormalizedValue(attrValue); |
| } |
| |
| updateEntity.setAttribute(attrName, objId); |
| break; |
| |
| default: |
| throw new AtlasBaseException(AtlasErrorCode.ATTRIBUTE_UPDATE_NOT_SUPPORTED, attrName, attrType.getTypeName()); |
| } |
| |
| return createOrUpdate(new AtlasEntityStream(updateEntity), true, false, false); |
| } |
| |
| @Override |
| @GraphTransaction |
| public EntityMutationResponse deleteById(final String guid) throws AtlasBaseException { |
| if (StringUtils.isEmpty(guid)) { |
| throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); |
| } |
| |
| Collection<AtlasVertex> deletionCandidates = new ArrayList<>(); |
| AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(graph, guid); |
| |
| if (vertex != null) { |
| AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(vertex); |
| |
| AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, entityHeader), "delete entity: guid=", guid); |
| |
| deletionCandidates.add(vertex); |
| } else { |
| if (LOG.isDebugEnabled()) { |
| // Entity does not exist - treat as non-error, since the caller |
| // wanted to delete the entity and it's already gone. |
| LOG.debug("Deletion request ignored for non-existent entity with guid " + guid); |
| } |
| } |
| |
| EntityMutationResponse ret = deleteVertices(deletionCandidates); |
| |
| // Notify the change listeners |
| entityChangeNotifier.onEntitiesMutated(ret, false); |
| |
| return ret; |
| } |
| |
| @Override |
| @GraphTransaction |
| public EntityMutationResponse deleteByIds(final List<String> guids) throws AtlasBaseException { |
| if (CollectionUtils.isEmpty(guids)) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified"); |
| } |
| |
| Collection<AtlasVertex> deletionCandidates = new ArrayList<>(); |
| |
| for (String guid : guids) { |
| AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(graph, guid); |
| |
| if (vertex == null) { |
| if (LOG.isDebugEnabled()) { |
| // Entity does not exist - treat as non-error, since the caller |
| // wanted to delete the entity and it's already gone. |
| LOG.debug("Deletion request ignored for non-existent entity with guid " + guid); |
| } |
| |
| continue; |
| } |
| |
| AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(vertex); |
| |
| AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, entityHeader), "delete entity: guid=", guid); |
| |
| deletionCandidates.add(vertex); |
| } |
| |
| if (deletionCandidates.isEmpty()) { |
| LOG.info("No deletion candidate entities were found for guids %s", guids); |
| } |
| |
| EntityMutationResponse ret = deleteVertices(deletionCandidates); |
| |
| // Notify the change listeners |
| entityChangeNotifier.onEntitiesMutated(ret, false); |
| |
| return ret; |
| } |
| |
| @Override |
| @GraphTransaction |
| public EntityMutationResponse purgeByIds(Set<String> guids) throws AtlasBaseException { |
| if (CollectionUtils.isEmpty(guids)) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified"); |
| } |
| |
| AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_PURGE), "purge entity: guids=", guids); |
| Collection<AtlasVertex> purgeCandidates = new ArrayList<>(); |
| |
| for (String guid : guids) { |
| AtlasVertex vertex = AtlasGraphUtilsV2.findDeletedByGuid(graph, guid); |
| |
| if (vertex == null) { |
| // Entity does not exist - treat as non-error, since the caller |
| // wanted to delete the entity and it's already gone. |
| LOG.warn("Purge request ignored for non-existent/active entity with guid " + guid); |
| |
| continue; |
| } |
| |
| purgeCandidates.add(vertex); |
| } |
| |
| if (purgeCandidates.isEmpty()) { |
| LOG.info("No purge candidate entities were found for guids: " + guids + " which is already deleted"); |
| } |
| |
| EntityMutationResponse ret = purgeVertices(purgeCandidates); |
| |
| // Notify the change listeners |
| entityChangeNotifier.onEntitiesMutated(ret, false); |
| |
| return ret; |
| } |
| |
| @Override |
| @GraphTransaction |
| public EntityMutationResponse deleteByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException { |
| if (MapUtils.isEmpty(uniqAttributes)) { |
| throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, uniqAttributes.toString()); |
| } |
| |
| Collection<AtlasVertex> deletionCandidates = new ArrayList<>(); |
| AtlasVertex vertex = AtlasGraphUtilsV2.findByUniqueAttributes(graph, entityType, uniqAttributes); |
| |
| if (vertex != null) { |
| AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(vertex); |
| |
| AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, entityHeader), "delete entity: typeName=", entityType.getTypeName(), ", uniqueAttributes=", uniqAttributes); |
| |
| deletionCandidates.add(vertex); |
| } else { |
| if (LOG.isDebugEnabled()) { |
| // Entity does not exist - treat as non-error, since the caller |
| // wanted to delete the entity and it's already gone. |
| LOG.debug("Deletion request ignored for non-existent entity with uniqueAttributes " + uniqAttributes); |
| } |
| } |
| |
| EntityMutationResponse ret = deleteVertices(deletionCandidates); |
| |
| // Notify the change listeners |
| entityChangeNotifier.onEntitiesMutated(ret, false); |
| |
| return ret; |
| } |
| |
| @Override |
| @GraphTransaction |
| public String getGuidByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException{ |
| return AtlasGraphUtilsV2.getGuidByUniqueAttributes(graph, entityType, uniqAttributes); |
| } |
| |
| @Override |
| @GraphTransaction |
| public void addClassifications(final String guid, final List<AtlasClassification> classifications) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Adding classifications={} to entity={}", classifications, guid); |
| } |
| |
| if (StringUtils.isEmpty(guid)) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified"); |
| } |
| |
| if (CollectionUtils.isEmpty(classifications)) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified"); |
| } |
| |
| GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid); |
| |
| AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(graph, guid); |
| |
| if (entityVertex == null) { |
| throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); |
| } |
| |
| AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(entityVertex); |
| |
| for (AtlasClassification classification : classifications) { |
| AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_ADD_CLASSIFICATION, entityHeader, classification), |
| "add classification: guid=", guid, ", classification=", classification.getTypeName()); |
| } |
| |
| EntityMutationContext context = new EntityMutationContext(); |
| |
| context.cacheEntity(guid, entityVertex, typeRegistry.getEntityTypeByName(entityHeader.getTypeName())); |
| |
| |
| for (AtlasClassification classification : classifications) { |
| validateAndNormalize(classification); |
| } |
| |
| // validate if entity, not already associated with classifications |
| validateEntityAssociations(guid, classifications); |
| |
| entityGraphMapper.addClassifications(context, guid, classifications); |
| } |
| |
| @Override |
| @GraphTransaction |
| public void updateClassifications(String guid, List<AtlasClassification> classifications) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Updating classifications={} for entity={}", classifications, guid); |
| } |
| |
| AtlasPerfTracer perf = null; |
| |
| if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { |
| AtlasPerfTracer.getPerfTracer(PERF_LOG, "AtlasEntityStoreV2.updateClassification()"); |
| } |
| |
| if (StringUtils.isEmpty(guid)) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid not specified"); |
| } |
| |
| if (CollectionUtils.isEmpty(classifications)) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified"); |
| } |
| |
| GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid); |
| |
| AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(graph, guid); |
| |
| if (entityVertex == null) { |
| throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); |
| } |
| |
| AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(entityVertex); |
| |
| for (AtlasClassification classification : classifications) { |
| AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE_CLASSIFICATION, entityHeader, classification), "update classification: guid=", guid, ", classification=", classification.getTypeName()); |
| } |
| |
| EntityMutationContext context = new EntityMutationContext(); |
| |
| context.cacheEntity(guid, entityVertex, typeRegistry.getEntityTypeByName(entityHeader.getTypeName())); |
| |
| |
| for (AtlasClassification classification : classifications) { |
| validateAndNormalize(classification); |
| } |
| |
| entityGraphMapper.updateClassifications(context, guid, classifications); |
| |
| AtlasPerfTracer.log(perf); |
| } |
| |
| @Override |
| @GraphTransaction |
| public void addClassification(final List<String> guids, final AtlasClassification classification) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Adding classification={} to entities={}", classification, guids); |
| } |
| |
| if (CollectionUtils.isEmpty(guids)) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified"); |
| } |
| if (classification == null) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classification not specified"); |
| } |
| |
| EntityMutationContext context = new EntityMutationContext(); |
| |
| GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guids); |
| |
| for (String guid : guids) { |
| AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(graph, guid); |
| |
| if (entityVertex == null) { |
| throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); |
| } |
| |
| AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(entityVertex); |
| |
| AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_ADD_CLASSIFICATION, entityHeader, classification), |
| "add classification: guid=", guid, ", classification=", classification.getTypeName()); |
| |
| context.cacheEntity(guid, entityVertex, typeRegistry.getEntityTypeByName(entityHeader.getTypeName())); |
| } |
| |
| |
| validateAndNormalize(classification); |
| |
| List<AtlasClassification> classifications = Collections.singletonList(classification); |
| |
| for (String guid : guids) { |
| validateEntityAssociations(guid, classifications); |
| |
| entityGraphMapper.addClassifications(context, guid, classifications); |
| } |
| } |
| |
| @Override |
| @GraphTransaction |
| public void deleteClassification(final String guid, final String classificationName) throws AtlasBaseException { |
| deleteClassification(guid, classificationName, null); |
| } |
| |
| @Override |
| @GraphTransaction |
| public void deleteClassification(final String guid, final String classificationName, final String associatedEntityGuid) throws AtlasBaseException { |
| if (StringUtils.isEmpty(guid)) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified"); |
| } |
| if (StringUtils.isEmpty(classificationName)) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications not specified"); |
| } |
| |
| GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid); |
| |
| AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); |
| |
| // verify authorization only for removal of directly associated classification and not propagated one. |
| if (StringUtils.isEmpty(associatedEntityGuid) || guid.equals(associatedEntityGuid)) { |
| AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_REMOVE_CLASSIFICATION, |
| entityHeader, new AtlasClassification(classificationName)), |
| "remove classification: guid=", guid, ", classification=", classificationName); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Deleting classification={} from entity={}", classificationName, guid); |
| } |
| |
| |
| entityGraphMapper.deleteClassification(guid, classificationName, associatedEntityGuid); |
| } |
| |
| |
| @GraphTransaction |
| public List<AtlasClassification> retrieveClassifications(String guid) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Retriving classifications for entity={}", guid); |
| } |
| |
| AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); |
| |
| return entityHeader.getClassifications(); |
| } |
| |
| |
| @Override |
| @GraphTransaction |
| public List<AtlasClassification> getClassifications(String guid) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Getting classifications for entity={}", guid); |
| } |
| |
| AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); |
| |
| AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ, entityHeader), "get classifications: guid=", guid); |
| |
| return entityHeader.getClassifications(); |
| } |
| |
| @Override |
| @GraphTransaction |
| public AtlasClassification getClassification(String guid, String classificationName) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Getting classifications for entities={}", guid); |
| } |
| |
| AtlasClassification ret = null; |
| AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); |
| |
| if (CollectionUtils.isNotEmpty(entityHeader.getClassifications())) { |
| AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ, entityHeader), "get classification: guid=", guid, ", classification=", classificationName); |
| |
| for (AtlasClassification classification : entityHeader.getClassifications()) { |
| if (!StringUtils.equalsIgnoreCase(classification.getTypeName(), classificationName)) { |
| continue; |
| } |
| |
| if (StringUtils.isEmpty(classification.getEntityGuid()) || StringUtils.equalsIgnoreCase(classification.getEntityGuid(), guid)) { |
| ret = classification; |
| break; |
| } else if (ret == null) { |
| ret = classification; |
| } |
| } |
| } |
| |
| if (ret == null) { |
| throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationName); |
| } |
| |
| return ret; |
| } |
| |
| @Override |
| @GraphTransaction |
| public String setClassifications(AtlasEntityHeaders entityHeaders) { |
| ClassificationAssociator.Updater associator = new ClassificationAssociator.Updater(graph, typeRegistry, this); |
| return associator.setClassifications(entityHeaders.getGuidHeaderMap()); |
| } |
| |
| @Override |
| @GraphTransaction |
| public void addOrUpdateBusinessAttributes(String guid, Map<String, Map<String, Object>> businessAttrbutes, boolean isOverwrite) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| |
| LOG.debug("==> addOrUpdateBusinessAttributes(guid={}, businessAttributes={}, isOverwrite={})", guid, businessAttrbutes, isOverwrite); |
| } |
| |
| if (StringUtils.isEmpty(guid)) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "guid is null/empty"); |
| } |
| |
| if (MapUtils.isEmpty(businessAttrbutes)) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "businessAttributes is null/empty"); |
| } |
| |
| AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(graph, guid); |
| |
| if (entityVertex == null) { |
| throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); |
| } |
| |
| String typeName = getTypeName(entityVertex); |
| AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); |
| AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(entityVertex); |
| Map<String, Map<String, Object>> currEntityBusinessAttributes = entityRetriever.getBusinessMetadata(entityVertex); |
| Set<String> updatedBusinessMetadataNames = new HashSet<>(); |
| |
| for (String bmName : entityType.getBusinessAttributes().keySet()) { |
| Map<String, Object> bmAttrs = businessAttrbutes.get(bmName); |
| Map<String, Object> currBmAttrs = currEntityBusinessAttributes != null ? currEntityBusinessAttributes.get(bmName) : null; |
| |
| if (bmAttrs == null && !isOverwrite) { |
| continue; |
| } else if (MapUtils.isEmpty(bmAttrs) && MapUtils.isEmpty(currBmAttrs)) { // no change |
| continue; |
| } else if (Objects.equals(bmAttrs, currBmAttrs)) { // no change |
| continue; |
| } |
| |
| updatedBusinessMetadataNames.add(bmName); |
| } |
| |
| AtlasEntityAccessRequestBuilder requestBuilder = new AtlasEntityAccessRequestBuilder(typeRegistry, AtlasPrivilege.ENTITY_UPDATE_BUSINESS_METADATA, entityHeader); |
| |
| for (String bmName : updatedBusinessMetadataNames) { |
| requestBuilder.setBusinessMetadata(bmName); |
| |
| AtlasAuthorizationUtils.verifyAccess(requestBuilder.build(), "add/update business-metadata: guid=", guid, ", business-metadata-name=", bmName); |
| } |
| |
| validateBusinessAttributes(entityVertex, entityType, businessAttrbutes, isOverwrite); |
| |
| if (isOverwrite) { |
| entityGraphMapper.setBusinessAttributes(entityVertex, entityType, businessAttrbutes); |
| } else { |
| entityGraphMapper.addOrUpdateBusinessAttributes(entityVertex, entityType, businessAttrbutes); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("<== addOrUpdateBusinessAttributes(guid={}, businessAttributes={}, isOverwrite={})", guid, businessAttrbutes, isOverwrite); |
| } |
| } |
| |
| @Override |
| @GraphTransaction |
| public void removeBusinessAttributes(String guid, Map<String, Map<String, Object>> businessAttributes) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("==> removeBusinessAttributes(guid={}, businessAttributes={})", guid, businessAttributes); |
| } |
| |
| if (StringUtils.isEmpty(guid)) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "guid is null/empty"); |
| } |
| |
| if (MapUtils.isEmpty(businessAttributes)) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "businessAttributes is null/empty"); |
| } |
| |
| AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(graph, guid); |
| |
| if (entityVertex == null) { |
| throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); |
| } |
| |
| String typeName = getTypeName(entityVertex); |
| AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); |
| AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(entityVertex); |
| AtlasEntityAccessRequestBuilder requestBuilder = new AtlasEntityAccessRequestBuilder(typeRegistry, AtlasPrivilege.ENTITY_UPDATE_BUSINESS_METADATA, entityHeader); |
| |
| for (String bmName : businessAttributes.keySet()) { |
| requestBuilder.setBusinessMetadata(bmName); |
| |
| AtlasAuthorizationUtils.verifyAccess(requestBuilder.build(), "remove business-metadata: guid=", guid, ", business-metadata=", bmName); |
| } |
| |
| entityGraphMapper.removeBusinessAttributes(entityVertex, entityType, businessAttributes); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("<== removeBusinessAttributes(guid={}, businessAttributes={})", guid, businessAttributes); |
| } |
| } |
| |
| @Override |
| @GraphTransaction |
| public void setLabels(String guid, Set<String> labels) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("==> setLabels()"); |
| } |
| |
| if (StringUtils.isEmpty(guid)) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "guid is null/empty"); |
| } |
| |
| AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(graph, guid); |
| |
| if (entityVertex == null) { |
| throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); |
| } |
| |
| validateLabels(labels); |
| |
| AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(entityVertex); |
| Set<String> addedLabels = Collections.emptySet(); |
| Set<String> removedLabels = Collections.emptySet(); |
| |
| if (CollectionUtils.isEmpty(entityHeader.getLabels())) { |
| addedLabels = labels; |
| } else if (CollectionUtils.isEmpty(labels)) { |
| removedLabels = entityHeader.getLabels(); |
| } else { |
| addedLabels = new HashSet<String>(CollectionUtils.subtract(labels, entityHeader.getLabels())); |
| removedLabels = new HashSet<String>(CollectionUtils.subtract(entityHeader.getLabels(), labels)); |
| } |
| |
| if (addedLabels != null) { |
| AtlasEntityAccessRequestBuilder requestBuilder = new AtlasEntityAccessRequestBuilder(typeRegistry, AtlasPrivilege.ENTITY_ADD_LABEL, entityHeader); |
| |
| for (String label : addedLabels) { |
| requestBuilder.setLabel(label); |
| |
| AtlasAuthorizationUtils.verifyAccess(requestBuilder.build(), "add label: guid=", guid, ", label=", label); |
| } |
| } |
| |
| if (removedLabels != null) { |
| AtlasEntityAccessRequestBuilder requestBuilder = new AtlasEntityAccessRequestBuilder(typeRegistry, AtlasPrivilege.ENTITY_REMOVE_LABEL, entityHeader); |
| |
| for (String label : removedLabels) { |
| requestBuilder.setLabel(label); |
| |
| AtlasAuthorizationUtils.verifyAccess(requestBuilder.build(), "remove label: guid=", guid, ", label=", label); |
| } |
| } |
| |
| entityGraphMapper.setLabels(entityVertex, labels); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("<== setLabels()"); |
| } |
| } |
| |
| @Override |
| @GraphTransaction |
| public void removeLabels(String guid, Set<String> labels) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("==> removeLabels()"); |
| } |
| |
| if (StringUtils.isEmpty(guid)) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "guid is null/empty"); |
| } |
| |
| if (CollectionUtils.isEmpty(labels)) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "labels is null/empty"); |
| } |
| |
| AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(graph, guid); |
| |
| if (entityVertex == null) { |
| throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); |
| } |
| |
| AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(entityVertex); |
| AtlasEntityAccessRequestBuilder requestBuilder = new AtlasEntityAccessRequestBuilder(typeRegistry, AtlasPrivilege.ENTITY_REMOVE_LABEL, entityHeader); |
| |
| for (String label : labels) { |
| requestBuilder.setLabel(label); |
| |
| AtlasAuthorizationUtils.verifyAccess(requestBuilder.build(), "remove label: guid=", guid, ", label=", label); |
| } |
| |
| validateLabels(labels); |
| |
| entityGraphMapper.removeLabels(entityVertex, labels); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("<== removeLabels()"); |
| } |
| } |
| |
| @Override |
| @GraphTransaction |
| public void addLabels(String guid, Set<String> labels) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("==> addLabels()"); |
| } |
| |
| if (StringUtils.isEmpty(guid)) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "guid is null/empty"); |
| } |
| |
| if (CollectionUtils.isEmpty(labels)) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "labels is null/empty"); |
| } |
| |
| AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(graph, guid); |
| |
| if (entityVertex == null) { |
| throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); |
| } |
| |
| AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(entityVertex); |
| AtlasEntityAccessRequestBuilder requestBuilder = new AtlasEntityAccessRequestBuilder(typeRegistry, AtlasPrivilege.ENTITY_ADD_LABEL, entityHeader); |
| |
| for (String label : labels) { |
| requestBuilder.setLabel(label); |
| |
| AtlasAuthorizationUtils.verifyAccess(requestBuilder.build(), "add/update label: guid=", guid, ", label=", label); |
| } |
| |
| validateLabels(labels); |
| |
| entityGraphMapper.addLabels(entityVertex, labels); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("<== addLabels()"); |
| } |
| } |
| |
| private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications, boolean replaceBusinessAttributes) throws AtlasBaseException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("==> createOrUpdate()"); |
| } |
| |
| if (entityStream == null || !entityStream.hasNext()) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update."); |
| } |
| |
| AtlasPerfTracer perf = null; |
| |
| if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { |
| perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "createOrUpdate()"); |
| } |
| |
| MetricRecorder metric = RequestContext.get().startMetricRecord("createOrUpdate"); |
| |
| try { |
| final EntityMutationContext context = preCreateOrUpdate(entityStream, entityGraphMapper, isPartialUpdate); |
| |
| // Check if authorized to create entities |
| if (!RequestContext.get().isImportInProgress()) { |
| for (AtlasEntity entity : context.getCreatedEntities()) { |
| AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, new AtlasEntityHeader(entity)), |
| "create entity: type=", entity.getTypeName()); |
| } |
| } |
| |
| // for existing entities, skip update if incoming entity doesn't have any change |
| if (CollectionUtils.isNotEmpty(context.getUpdatedEntities())) { |
| MetricRecorder checkForUnchangedEntities = RequestContext.get().startMetricRecord("checkForUnchangedEntities"); |
| |
| List<AtlasEntity> entitiesToSkipUpdate = null; |
| |
| for (AtlasEntity entity : context.getUpdatedEntities()) { |
| String guid = entity.getGuid(); |
| AtlasVertex vertex = context.getVertex(guid); |
| AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); |
| boolean hasUpdates = false; |
| |
| if (!hasUpdates) { |
| hasUpdates = entity.getStatus() == AtlasEntity.Status.DELETED; // entity status could be updated during import |
| } |
| |
| if (!hasUpdates && MapUtils.isNotEmpty(entity.getAttributes())) { // check for attribute value change |
| for (AtlasAttribute attribute : entityType.getAllAttributes().values()) { |
| if (!entity.getAttributes().containsKey(attribute.getName())) { // if value is not provided, current value will not be updated |
| continue; |
| } |
| |
| Object newVal = entity.getAttribute(attribute.getName()); |
| Object currVal = entityRetriever.getEntityAttribute(vertex, attribute); |
| |
| if (!attribute.getAttributeType().areEqualValues(currVal, newVal, context.getGuidAssignments())) { |
| hasUpdates = true; |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("found attribute update: entity(guid={}, typeName={}), attrName={}, currValue={}, newValue={}", guid, entity.getTypeName(), attribute.getName(), currVal, newVal); |
| } |
| |
| break; |
| } |
| } |
| } |
| |
| if (!hasUpdates && MapUtils.isNotEmpty(entity.getRelationshipAttributes())) { // check of relationsship-attribute value change |
| for (String attributeName : entityType.getRelationshipAttributes().keySet()) { |
| if (!entity.getRelationshipAttributes().containsKey(attributeName)) { // if value is not provided, current value will not be updated |
| continue; |
| } |
| |
| Object newVal = entity.getRelationshipAttribute(attributeName); |
| String relationshipType = AtlasEntityUtil.getRelationshipType(newVal); |
| AtlasAttribute attribute = entityType.getRelationshipAttribute(attributeName, relationshipType); |
| Object currVal = entityRetriever.getEntityAttribute(vertex, attribute); |
| |
| if (!attribute.getAttributeType().areEqualValues(currVal, newVal, context.getGuidAssignments())) { |
| hasUpdates = true; |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("found relationship attribute update: entity(guid={}, typeName={}), attrName={}, currValue={}, newValue={}", guid, entity.getTypeName(), attribute.getName(), currVal, newVal); |
| } |
| |
| break; |
| } |
| } |
| } |
| |
| if (!hasUpdates && entity.getCustomAttributes() != null) { |
| Map<String, String> currCustomAttributes = getCustomAttributes(vertex); |
| Map<String, String> newCustomAttributes = entity.getCustomAttributes(); |
| |
| if (!Objects.equals(currCustomAttributes, newCustomAttributes)) { |
| hasUpdates = true; |
| } |
| } |
| |
| // if classifications are to be replaced, then skip updates only when no change in classifications |
| if (!hasUpdates && replaceClassifications) { |
| List<AtlasClassification> newVal = entity.getClassifications(); |
| List<AtlasClassification> currVal = entityRetriever.getAllClassifications(vertex); |
| |
| if (!Objects.equals(currVal, newVal)) { |
| hasUpdates = true; |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("found classifications update: entity(guid={}, typeName={}), currValue={}, newValue={}", guid, entity.getTypeName(), currVal, newVal); |
| } |
| } |
| } |
| |
| if (!hasUpdates) { |
| if (entitiesToSkipUpdate == null) { |
| entitiesToSkipUpdate = new ArrayList<>(); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("skipping unchanged entity: {}", entity); |
| } |
| |
| entitiesToSkipUpdate.add(entity); |
| RequestContext.get().recordEntityToSkip(entity.getGuid()); |
| } |
| } |
| |
| if (entitiesToSkipUpdate != null) { |
| // remove entitiesToSkipUpdate from EntityMutationContext |
| context.getUpdatedEntities().removeAll(entitiesToSkipUpdate); |
| } |
| |
| // Check if authorized to update entities |
| if (!RequestContext.get().isImportInProgress()) { |
| for (AtlasEntity entity : context.getUpdatedEntities()) { |
| AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, new AtlasEntityHeader(entity)), |
| "update entity: type=", entity.getTypeName()); |
| } |
| } |
| |
| RequestContext.get().endMetricRecord(checkForUnchangedEntities); |
| } |
| |
| EntityMutationResponse ret = entityGraphMapper.mapAttributesAndClassifications(context, isPartialUpdate, replaceClassifications, replaceBusinessAttributes); |
| |
| ret.setGuidAssignments(context.getGuidAssignments()); |
| |
| // Notify the change listeners |
| entityChangeNotifier.onEntitiesMutated(ret, RequestContext.get().isImportInProgress()); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("<== createOrUpdate()"); |
| } |
| |
| return ret; |
| } finally { |
| RequestContext.get().endMetricRecord(metric); |
| |
| AtlasPerfTracer.log(perf); |
| } |
| } |
| |
| private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws AtlasBaseException { |
| MetricRecorder metric = RequestContext.get().startMetricRecord("preCreateOrUpdate"); |
| |
| EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV2(graph, typeRegistry, entityStream, entityGraphMapper); |
| EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities(); |
| EntityMutationContext context = new EntityMutationContext(discoveryContext); |
| RequestContext requestContext = RequestContext.get(); |
| |
| for (String guid : discoveryContext.getReferencedGuids()) { |
| AtlasEntity entity = entityStream.getByGuid(guid); |
| |
| if (entity != null) { // entity would be null if guid is not in the stream but referenced by an entity in the stream |
| AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); |
| |
| if (entityType == null) { |
| throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName()); |
| } |
| |
| compactAttributes(entity, entityType); |
| |
| AtlasVertex vertex = getResolvedEntityVertex(discoveryContext, entity); |
| |
| if (vertex != null) { |
| if (!isPartialUpdate) { |
| graphDiscoverer.validateAndNormalize(entity); |
| |
| // change entity 'isInComplete' to 'false' during full update |
| if (isEntityIncomplete(vertex)) { |
| vertex.removeProperty(IS_INCOMPLETE_PROPERTY_KEY); |
| |
| entity.setIsIncomplete(FALSE); |
| } |
| } else { |
| graphDiscoverer.validateAndNormalizeForUpdate(entity); |
| } |
| |
| String guidVertex = AtlasGraphUtilsV2.getIdFromVertex(vertex); |
| |
| if (!StringUtils.equals(guidVertex, guid)) { // if entity was found by unique attribute |
| entity.setGuid(guidVertex); |
| |
| requestContext.recordEntityGuidUpdate(entity, guid); |
| } |
| |
| context.addUpdated(guid, entity, entityType, vertex); |
| } else { |
| graphDiscoverer.validateAndNormalize(entity); |
| |
| //Create vertices which do not exist in the repository |
| if (RequestContext.get().isImportInProgress() && AtlasTypeUtil.isAssignedGuid(entity.getGuid())) { |
| vertex = entityGraphMapper.createVertexWithGuid(entity, entity.getGuid()); |
| } else { |
| vertex = entityGraphMapper.createVertex(entity); |
| } |
| |
| discoveryContext.addResolvedGuid(guid, vertex); |
| |
| discoveryContext.addResolvedIdByUniqAttribs(getAtlasObjectId(entity), vertex); |
| |
| String generatedGuid = AtlasGraphUtilsV2.getIdFromVertex(vertex); |
| |
| entity.setGuid(generatedGuid); |
| |
| requestContext.recordEntityGuidUpdate(entity, guid); |
| |
| context.addCreated(guid, entity, entityType, vertex); |
| } |
| |
| // during import, update the system attributes |
| if (RequestContext.get().isImportInProgress()) { |
| Status newStatus = entity.getStatus(); |
| |
| if (newStatus != null) { |
| Status currStatus = AtlasGraphUtilsV2.getState(vertex); |
| |
| if (currStatus == Status.ACTIVE && newStatus == Status.DELETED) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("entity-delete via import - guid={}", guid); |
| } |
| |
| context.addEntityToDelete(vertex); |
| } else if (currStatus == Status.DELETED && newStatus == Status.ACTIVE) { |
| LOG.warn("Import is attempting to activate deleted entity (guid={}).", guid); |
| entityGraphMapper.importActivateEntity(vertex, entity); |
| context.addCreated(guid, entity, entityType, vertex); |
| } |
| } |
| |
| entityGraphMapper.updateSystemAttributes(vertex, entity); |
| } |
| } |
| } |
| |
| RequestContext.get().endMetricRecord(metric); |
| |
| return context; |
| } |
| |
| private AtlasVertex getResolvedEntityVertex(EntityGraphDiscoveryContext context, AtlasEntity entity) throws AtlasBaseException { |
| AtlasObjectId objectId = getAtlasObjectId(entity); |
| AtlasVertex ret = context.getResolvedEntityVertex(entity.getGuid()); |
| |
| if (ret != null) { |
| context.addResolvedIdByUniqAttribs(objectId, ret); |
| } else { |
| ret = context.getResolvedEntityVertex(objectId); |
| |
| if (ret != null) { |
| context.addResolvedGuid(entity.getGuid(), ret); |
| } |
| } |
| |
| return ret; |
| } |
| |
| private AtlasObjectId getAtlasObjectId(AtlasEntity entity) { |
| AtlasObjectId ret = entityRetriever.toAtlasObjectId(entity); |
| |
| if (ret != null && !RequestContext.get().isImportInProgress() && MapUtils.isNotEmpty(ret.getUniqueAttributes())) { |
| // if uniqueAttributes is not empty, reset guid to null. |
| ret.setGuid(null); |
| } |
| |
| return ret; |
| } |
| |
| private EntityMutationResponse deleteVertices(Collection<AtlasVertex> deletionCandidates) throws AtlasBaseException { |
| EntityMutationResponse response = new EntityMutationResponse(); |
| RequestContext req = RequestContext.get(); |
| |
| deleteDelegate.getHandler().deleteEntities(deletionCandidates); // this will update req with list of deleted/updated entities |
| |
| for (AtlasEntityHeader entity : req.getDeletedEntities()) { |
| response.addEntity(DELETE, entity); |
| } |
| |
| for (AtlasEntityHeader entity : req.getUpdatedEntities()) { |
| response.addEntity(UPDATE, entity); |
| } |
| |
| return response; |
| } |
| |
| private EntityMutationResponse purgeVertices(Collection<AtlasVertex> purgeCandidates) throws AtlasBaseException { |
| EntityMutationResponse response = new EntityMutationResponse(); |
| RequestContext req = RequestContext.get(); |
| |
| req.setDeleteType(DeleteType.HARD); |
| req.setPurgeRequested(true); |
| deleteDelegate.getHandler().deleteEntities(purgeCandidates); // this will update req with list of purged entities |
| |
| for (AtlasEntityHeader entity : req.getDeletedEntities()) { |
| response.addEntity(PURGE, entity); |
| } |
| |
| return response; |
| } |
| |
| private void validateAndNormalize(AtlasClassification classification) throws AtlasBaseException { |
| AtlasClassificationType type = typeRegistry.getClassificationTypeByName(classification.getTypeName()); |
| |
| if (type == null) { |
| throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classification.getTypeName()); |
| } |
| |
| List<String> messages = new ArrayList<>(); |
| |
| type.validateValue(classification, classification.getTypeName(), messages); |
| |
| if (!messages.isEmpty()) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, messages); |
| } |
| |
| type.getNormalizedValue(classification); |
| } |
| |
| /** |
| * Validate if classification is not already associated with the entities |
| * |
| * @param guid unique entity id |
| * @param classifications list of classifications to be associated |
| */ |
| private void validateEntityAssociations(String guid, List<AtlasClassification> classifications) throws AtlasBaseException { |
| List<String> entityClassifications = getClassificationNames(guid); |
| String entityTypeName = AtlasGraphUtilsV2.getTypeNameFromGuid(graph, guid); |
| AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName); |
| |
| for (AtlasClassification classification : classifications) { |
| String newClassification = classification.getTypeName(); |
| |
| if (CollectionUtils.isNotEmpty(entityClassifications) && entityClassifications.contains(newClassification)) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "entity: " + guid + |
| ", already associated with classification: " + newClassification); |
| } |
| |
| // for each classification, check whether there are entities it should be restricted to |
| AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(newClassification); |
| |
| if (!classificationType.canApplyToEntityType(entityType)) { |
| throw new AtlasBaseException(AtlasErrorCode.INVALID_ENTITY_FOR_CLASSIFICATION, guid, entityTypeName, newClassification); |
| } |
| } |
| } |
| |
| private List<String> getClassificationNames(String guid) throws AtlasBaseException { |
| List<String> ret = null; |
| List<AtlasClassification> classifications = retrieveClassifications(guid); |
| |
| if (CollectionUtils.isNotEmpty(classifications)) { |
| ret = new ArrayList<>(); |
| |
| for (AtlasClassification classification : classifications) { |
| String entityGuid = classification.getEntityGuid(); |
| |
| if (StringUtils.isEmpty(entityGuid) || StringUtils.equalsIgnoreCase(guid, entityGuid)) { |
| ret.add(classification.getTypeName()); |
| } |
| } |
| } |
| |
| return ret; |
| } |
| |
| // move/remove relationship-attributes present in 'attributes' |
| private void compactAttributes(AtlasEntity entity, AtlasEntityType entityType) { |
| if (entity != null) { |
| for (String attrName : entityType.getRelationshipAttributes().keySet()) { |
| if (entity.hasAttribute(attrName)) { // relationship attribute is present in 'attributes' |
| Object attrValue = entity.removeAttribute(attrName); |
| |
| if (attrValue != null) { |
| // if the attribute doesn't exist in relationshipAttributes, add it |
| Object relationshipAttrValue = entity.getRelationshipAttribute(attrName); |
| |
| if (relationshipAttrValue == null) { |
| entity.setRelationshipAttribute(attrName, attrValue); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("moved attribute {}.{} from attributes to relationshipAttributes", entityType.getTypeName(), attrName); |
| } |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("attribute {}.{} is present in attributes and relationshipAttributes. Removed from attributes", entityType.getTypeName(), attrName); |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| private void validateBusinessAttributes(AtlasVertex entityVertex, AtlasEntityType entityType, Map<String, Map<String, Object>> businessAttributes, boolean isOverwrite) throws AtlasBaseException { |
| List<String> messages = new ArrayList<>(); |
| |
| Map<String, Map<String, AtlasBusinessAttribute>> entityTypeBusinessMetadata = entityType.getBusinessAttributes(); |
| |
| for (String bmName : businessAttributes.keySet()) { |
| if (!entityTypeBusinessMetadata.containsKey(bmName)) { |
| messages.add(bmName + ": invalid business-metadata for entity type " + entityType.getTypeName()); |
| |
| continue; |
| } |
| |
| Map<String, AtlasBusinessAttribute> entityTypeBusinessAttributes = entityTypeBusinessMetadata.get(bmName); |
| Map<String, Object> entityBusinessAttributes = businessAttributes.get(bmName); |
| |
| for (AtlasBusinessAttribute bmAttribute : entityTypeBusinessAttributes.values()) { |
| AtlasType attrType = bmAttribute.getAttributeType(); |
| String attrName = bmAttribute.getName(); |
| Object attrValue = entityBusinessAttributes.get(attrName); |
| String fieldName = entityType.getTypeName() + "." + bmName + "." + attrName; |
| |
| if (attrValue != null) { |
| attrType.validateValue(attrValue, fieldName, messages); |
| boolean isValidLength = bmAttribute.isValidLength(attrValue); |
| if (!isValidLength) { |
| messages.add(fieldName + ": Business attribute-value exceeds maximum length limit"); |
| } |
| |
| } else if (!bmAttribute.getAttributeDef().getIsOptional()) { |
| final boolean isAttrValuePresent; |
| |
| if (isOverwrite) { |
| isAttrValuePresent = false; |
| } else { |
| Object existingValue = AtlasGraphUtilsV2.getEncodedProperty(entityVertex, bmAttribute.getVertexPropertyName(), Object.class); |
| |
| isAttrValuePresent = existingValue != null; |
| } |
| |
| if (!isAttrValuePresent) { |
| messages.add(fieldName + ": mandatory business-metadata attribute value missing in type " + entityType.getTypeName()); |
| } |
| } |
| } |
| } |
| |
| if (!messages.isEmpty()) { |
| throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages); |
| } |
| } |
| |
| @Override |
| @GraphTransaction |
| public BulkImportResponse bulkCreateOrUpdateBusinessAttributes(InputStream inputStream, String fileName) throws AtlasBaseException { |
| BulkImportResponse ret = new BulkImportResponse(); |
| |
| try { |
| if (StringUtils.isBlank(fileName)) { |
| throw new AtlasBaseException(AtlasErrorCode.FILE_NAME_NOT_FOUND, fileName); |
| } |
| |
| List<String[]> fileData = FileUtils.readFileData(fileName, inputStream); |
| Map<String, AtlasEntity> attributesToAssociate = getBusinessMetadataDefList(fileData, ret); |
| |
| for (AtlasEntity entity : attributesToAssociate.values()) { |
| try { |
| addOrUpdateBusinessAttributes(entity.getGuid(), entity.getBusinessAttributes(), true); |
| |
| BulkImportResponse.ImportInfo successImportInfo = new BulkImportResponse.ImportInfo(entity.getGuid(), entity.getBusinessAttributes().toString()); |
| |
| ret.setSuccessImportInfoList(successImportInfo); |
| }catch (Exception e) { |
| LOG.error("Error occurred while updating BusinessMetadata Attributes for Entity " + entity.getGuid()); |
| |
| BulkImportResponse.ImportInfo failedImportInfo = new BulkImportResponse.ImportInfo(entity.getGuid(), entity.getBusinessAttributes().toString(), BulkImportResponse.ImportStatus.FAILED, e.getMessage()); |
| |
| ret.getFailedImportInfoList().add(failedImportInfo); |
| } |
| } |
| } catch (IOException e) { |
| LOG.error("An Exception occurred while uploading the file {}", fileName, e); |
| |
| throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_UPLOAD, e); |
| } |
| |
| return ret; |
| } |
| |
| private Map<String, AtlasEntity> getBusinessMetadataDefList(List<String[]> fileData, BulkImportResponse bulkImportResponse) throws AtlasBaseException { |
| Map<String, AtlasEntity> ret = new HashMap<>(); |
| Map<String, AtlasVertex> vertexCache = new HashMap<>(); |
| List<String> failedMsgList = new ArrayList<>(); |
| |
| for (int lineIndex = 0; lineIndex < fileData.size(); lineIndex++) { |
| String[] record = fileData.get(lineIndex); |
| |
| boolean missingFields = record.length < FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX || |
| StringUtils.isBlank(record[FileUtils.TYPENAME_COLUMN_INDEX]) || |
| StringUtils.isBlank(record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX]) || |
| StringUtils.isBlank(record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX]) || |
| StringUtils.isBlank(record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX]); |
| |
| if (missingFields){ |
| failedMsgList.add("Line #" + (lineIndex + 1) + ": missing fields. " + Arrays.toString(record)); |
| |
| continue; |
| } |
| |
| String typeName = record[FileUtils.TYPENAME_COLUMN_INDEX]; |
| AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); |
| |
| if (entityType == null) { |
| failedMsgList.add("Line #" + (lineIndex + 1) + ": invalid entity-type '" + typeName + "'"); |
| |
| continue; |
| } |
| |
| String uniqueAttrValue = record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX]; |
| String bmAttribute = record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX]; |
| String bmAttributeValue = record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX]; |
| String uniqueAttrName = AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME; |
| |
| if (record.length > FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX) { |
| uniqueAttrName = record[FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX]; |
| } |
| |
| AtlasAttribute uniqueAttribute = entityType.getAttribute(uniqueAttrName); |
| |
| if (uniqueAttribute == null) { |
| failedMsgList.add("Line #" + (lineIndex + 1) + ": attribute '" + uniqueAttrName + "' not found in entity-type '" + typeName + "'"); |
| |
| continue; |
| } |
| |
| if (!uniqueAttribute.getAttributeDef().getIsUnique()) { |
| failedMsgList.add("Line #" + (lineIndex + 1) + ": attribute '" + uniqueAttrName + "' is not an unique attribute in entity-type '" + typeName + "'"); |
| |
| continue; |
| } |
| |
| String vertexKey = uniqueAttribute.getVertexPropertyName() + "_" + uniqueAttrValue; |
| AtlasVertex vertex = vertexCache.get(vertexKey); |
| |
| if (vertex == null) { |
| vertex = AtlasGraphUtilsV2.findByTypeAndUniquePropertyName(graph, typeName, uniqueAttribute.getVertexUniquePropertyName(), uniqueAttrValue); |
| |
| if (vertex == null) { |
| failedMsgList.add("Line #" + (lineIndex + 1) + ": no " + typeName + " entity found with " + uniqueAttrName + "=" + uniqueAttrValue); |
| |
| continue; |
| } |
| |
| vertexCache.put(vertexKey, vertex); |
| } |
| |
| AtlasBusinessAttribute businessAttribute = entityType.getBusinesAAttribute(bmAttribute); |
| |
| if (businessAttribute == null) { |
| failedMsgList.add("Line #" + (lineIndex + 1) + ": invalid business attribute name '" + bmAttribute + "'"); |
| |
| continue; |
| } |
| |
| final Object attrValue; |
| |
| if (businessAttribute.getAttributeType().getTypeCategory() == TypeCategory.ARRAY) { |
| AtlasArrayType arrayType = (AtlasArrayType) businessAttribute.getAttributeType(); |
| List arrayValue; |
| |
| if (arrayType.getElementType() instanceof AtlasEnumType) { |
| arrayValue = AtlasGraphUtilsV2.assignEnumValues(bmAttributeValue, (AtlasEnumType) arrayType.getElementType(), failedMsgList, lineIndex+1); |
| } else { |
| arrayValue = assignMultipleValues(bmAttributeValue, arrayType.getElementTypeName(), failedMsgList, lineIndex+1); |
| } |
| |
| attrValue = arrayValue; |
| } else { |
| attrValue = bmAttributeValue; |
| } |
| |
| if (ret.containsKey(vertexKey)) { |
| AtlasEntity entity = ret.get(vertexKey); |
| |
| entity.setBusinessAttribute(businessAttribute.getDefinedInType().getTypeName(), businessAttribute.getName(), attrValue); |
| } else { |
| AtlasEntity entity = new AtlasEntity(); |
| String guid = GraphHelper.getGuid(vertex); |
| Map<String, Map<String, Object>> businessAttributes = entityRetriever.getBusinessMetadata(vertex); |
| |
| entity.setGuid(guid); |
| entity.setTypeName(typeName); |
| entity.setAttribute(uniqueAttribute.getName(), uniqueAttrValue); |
| |
| if (businessAttributes == null) { |
| businessAttributes = new HashMap<>(); |
| } |
| |
| entity.setBusinessAttributes(businessAttributes); |
| entity.setBusinessAttribute(businessAttribute.getDefinedInType().getTypeName(), businessAttribute.getName(), attrValue); |
| |
| ret.put(vertexKey, entity); |
| } |
| } |
| |
| for (String failedMsg : failedMsgList) { |
| LOG.error(failedMsg); |
| |
| BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(BulkImportResponse.ImportStatus.FAILED, failedMsg); |
| |
| bulkImportResponse.getFailedImportInfoList().add(importInfo); |
| } |
| |
| return ret; |
| } |
| |
| |
| private List assignMultipleValues(String bmAttributeValues, String elementTypeName, List failedTermMsgList, int lineIndex) { |
| |
| String[] arr = bmAttributeValues.split(FileUtils.ESCAPE_CHARACTER + FileUtils.PIPE_CHARACTER); |
| try { |
| switch (elementTypeName) { |
| |
| case AtlasBaseTypeDef.ATLAS_TYPE_FLOAT: |
| return AtlasGraphUtilsV2.floatParser(arr, failedTermMsgList, lineIndex); |
| |
| case AtlasBaseTypeDef.ATLAS_TYPE_INT: |
| return AtlasGraphUtilsV2.intParser(arr, failedTermMsgList, lineIndex); |
| |
| case AtlasBaseTypeDef.ATLAS_TYPE_LONG: |
| return AtlasGraphUtilsV2.longParser(arr, failedTermMsgList, lineIndex); |
| |
| case AtlasBaseTypeDef.ATLAS_TYPE_SHORT: |
| return AtlasGraphUtilsV2.shortParser(arr, failedTermMsgList, lineIndex); |
| |
| case AtlasBaseTypeDef.ATLAS_TYPE_DOUBLE: |
| return AtlasGraphUtilsV2.doubleParser(arr, failedTermMsgList, lineIndex); |
| |
| case AtlasBaseTypeDef.ATLAS_TYPE_DATE: |
| return AtlasGraphUtilsV2.dateParser(arr, failedTermMsgList, lineIndex); |
| |
| case AtlasBaseTypeDef.ATLAS_TYPE_BOOLEAN: |
| return AtlasGraphUtilsV2.booleanParser(arr, failedTermMsgList, lineIndex); |
| |
| default: |
| return Arrays.asList(arr); |
| } |
| } catch (Exception e) { |
| LOG.error("On line index " + lineIndex + "the provided BusinessMetadata AttributeValue " + bmAttributeValues + " are not of type - " + elementTypeName); |
| failedTermMsgList.add("On line index " + lineIndex + "the provided BusinessMetadata AttributeValue " + bmAttributeValues + " are not of type - " + elementTypeName); |
| } |
| return null; |
| } |
| |
| private boolean missingFieldsCheck(String[] record, BulkImportResponse bulkImportResponse, int lineIndex){ |
| boolean missingFieldsCheck = (record.length < FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX) || |
| StringUtils.isBlank(record[FileUtils.TYPENAME_COLUMN_INDEX]) || |
| StringUtils.isBlank(record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX]) || |
| StringUtils.isBlank(record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX]) || |
| StringUtils.isBlank(record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX]); |
| |
| if(missingFieldsCheck){ |
| LOG.error("Missing fields: " + Arrays.toString(record) + " at line #" + lineIndex); |
| String failedTermMsgs = "Missing fields: " + Arrays.toString(record) + " at line #" + lineIndex; |
| BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex); |
| bulkImportResponse.getFailedImportInfoList().add(importInfo); |
| } |
| return missingFieldsCheck; |
| } |
| } |