blob: fa4c051be39a2b3a294805833df7b677aaf0d2bc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v1;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.GraphTransaction;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
@Singleton
public class AtlasEntityStoreV1 implements AtlasEntityStore {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1.class);
private final DeleteHandlerV1 deleteHandler;
private final AtlasTypeRegistry typeRegistry;
private final AtlasEntityChangeNotifier entityChangeNotifier;
@Inject
public AtlasEntityStoreV1(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry typeRegistry, AtlasEntityChangeNotifier entityChangeNotifier) {
this.deleteHandler = deleteHandler;
this.typeRegistry = typeRegistry;
this.entityChangeNotifier = entityChangeNotifier;
}
@Override
@GraphTransaction
public AtlasEntityWithExtInfo getById(String guid) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> getById({})", guid);
}
EntityGraphRetriever entityRetriever = new EntityGraphRetriever(typeRegistry);
AtlasEntityWithExtInfo ret = entityRetriever.toAtlasEntityWithExtInfo(guid);
if (ret == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== getById({}): {}", guid, ret);
}
return ret;
}
@Override
@GraphTransaction
public AtlasEntitiesWithExtInfo getByIds(List<String> guids) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> getByIds({})", guids);
}
EntityGraphRetriever entityRetriever = new EntityGraphRetriever(typeRegistry);
AtlasEntitiesWithExtInfo ret = entityRetriever.toAtlasEntitiesWithExtInfo(guids);
if (LOG.isDebugEnabled()) {
LOG.debug("<== getByIds({}): {}", guids, ret);
}
return ret;
}
@Override
@GraphTransaction
public AtlasEntityWithExtInfo getByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes)
throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> getByUniqueAttribute({}, {})", entityType.getTypeName(), uniqAttributes);
}
AtlasVertex entityVertex = AtlasGraphUtilsV1.getVertexByUniqueAttributes(entityType, uniqAttributes);
EntityGraphRetriever entityRetriever = new EntityGraphRetriever(typeRegistry);
AtlasEntityWithExtInfo ret = entityRetriever.toAtlasEntityWithExtInfo(entityVertex);
if (ret == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, entityType.getTypeName(),
uniqAttributes.toString());
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== getByUniqueAttribute({}, {}): {}", entityType.getTypeName(), uniqAttributes, ret);
}
return ret;
}
@Override
public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> bulkImport()");
}
if (entityStream == null || !entityStream.hasNext()) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
}
EntityMutationResponse ret = new EntityMutationResponse();
ret.setGuidAssignments(new HashMap<String, String>());
Set<String> processedGuids = new HashSet<>();
int progressReportedAtCount = 0;
while (entityStream.hasNext()) {
AtlasEntityWithExtInfo entityWithExtInfo = entityStream.getNextEntityWithExtInfo();
AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
if(entity == null || processedGuids.contains(entity.getGuid())) {
continue;
}
AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, entityStream);
EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true);
if(CollectionUtils.isNotEmpty(entity.getClassifications())) {
addClassifications(entity.getGuid(), entity.getClassifications());
}
updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
if ((processedGuids.size() - progressReportedAtCount) > 1000) {
progressReportedAtCount = processedGuids.size();
LOG.info("bulkImport(): in progress.. number of entities imported: {}", progressReportedAtCount);
}
if (resp.getGuidAssignments() != null) {
ret.getGuidAssignments().putAll(resp.getGuidAssignments());
}
entityStream.onImportComplete(entity.getGuid());
}
importResult.getProcessedEntities().addAll(processedGuids);
LOG.info("bulkImport(): done. Number of entities imported: {}", processedGuids.size());
return ret;
}
private void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
if (list == null) {
return;
}
for (AtlasEntityHeader h : list) {
if(processedGuids.contains(h.getGuid())) {
continue;
}
processedGuids.add(h.getGuid());
importResult.incrementMeticsCounter(String.format(prefix, h.getTypeName()));
}
}
@GraphTransaction
private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> createOrUpdate()");
}
if (entityStream == null || !entityStream.hasNext()) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
}
// Create/Update entities
EntityGraphMapper entityGraphMapper = new EntityGraphMapper(deleteHandler, typeRegistry);
EntityMutationContext context = preCreateOrUpdate(entityStream, entityGraphMapper, isPartialUpdate);
EntityMutationResponse ret = entityGraphMapper.mapAttributesAndClassifications(context, isPartialUpdate, replaceClassifications);
ret.setGuidAssignments(context.getGuidAssignments());
if (LOG.isDebugEnabled()) {
LOG.debug("<== createOrUpdate()");
}
// Notify the change listeners
entityChangeNotifier.onEntitiesMutated(ret);
return ret;
}
@Override
public EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate) throws AtlasBaseException {
return createOrUpdate(entityStream, isPartialUpdate, false);
}
@Override
@GraphTransaction
public EntityMutationResponse updateByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes,
AtlasEntityWithExtInfo updatedEntityInfo) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> updateByUniqueAttributes({}, {})", entityType.getTypeName(), uniqAttributes);
}
if (updatedEntityInfo == null || updatedEntityInfo.getEntity() == null) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entity to update.");
}
String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, uniqAttributes);
AtlasEntity entity = updatedEntityInfo.getEntity();
entity.setGuid(guid);
return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), true);
}
@Override
@GraphTransaction
public EntityMutationResponse updateEntityAttributeByGuid(String guid, String attrName, Object attrValue)
throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> updateEntityAttributeByGuid({}, {}, {})", guid, attrName, attrValue);
}
AtlasEntityWithExtInfo entityInfo = getById(guid);
if (entityInfo == null || entityInfo.getEntity() == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
}
AtlasEntity entity = entityInfo.getEntity();
AtlasEntityType entityType = (AtlasEntityType) typeRegistry.getType(entity.getTypeName());
AtlasAttribute attr = entityType.getAttribute(attrName);
if (attr == null) {
throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_ATTRIBUTE, attrName, entity.getTypeName());
}
AtlasType attrType = attr.getAttributeType();
AtlasEntity updateEntity = new AtlasEntity();
updateEntity.setGuid(guid);
updateEntity.setTypeName(entity.getTypeName());
switch (attrType.getTypeCategory()) {
case PRIMITIVE:
updateEntity.setAttribute(attrName, attrValue);
break;
case OBJECT_ID_TYPE:
AtlasObjectId objId;
if (attrValue instanceof String) {
objId = new AtlasObjectId((String) attrValue, attr.getAttributeDef().getTypeName());
} else {
objId = (AtlasObjectId) attrType.getNormalizedValue(attrValue);
}
updateEntity.setAttribute(attrName, objId);
break;
default:
throw new AtlasBaseException(AtlasErrorCode.ATTRIBUTE_UPDATE_NOT_SUPPORTED, attrName, attrType.getTypeName());
}
return createOrUpdate(new AtlasEntityStream(updateEntity), true);
}
@GraphTransaction
public EntityMutationResponse deleteById(final String guid) throws AtlasBaseException {
if (StringUtils.isEmpty(guid)) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
}
// Retrieve vertices for requested guids.
AtlasVertex vertex = AtlasGraphUtilsV1.findByGuid(guid);
Collection<AtlasVertex> deletionCandidates = new ArrayList<>();
if (vertex != null) {
deletionCandidates.add(vertex);
} else {
if (LOG.isDebugEnabled()) {
// Entity does not exist - treat as non-error, since the caller
// wanted to delete the entity and it's already gone.
LOG.debug("Deletion request ignored for non-existent entity with guid " + guid);
}
}
EntityMutationResponse ret = deleteVertices(deletionCandidates);
// Notify the change listeners
entityChangeNotifier.onEntitiesMutated(ret);
return ret;
}
@Override
@GraphTransaction
public EntityMutationResponse deleteByIds(final List<String> guids) throws AtlasBaseException {
if (CollectionUtils.isEmpty(guids)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified");
}
Collection<AtlasVertex> deletionCandidates = new ArrayList<>();
for (String guid : guids) {
// Retrieve vertices for requested guids.
AtlasVertex vertex = AtlasGraphUtilsV1.findByGuid(guid);
if (vertex != null) {
deletionCandidates.add(vertex);
} else {
if (LOG.isDebugEnabled()) {
// Entity does not exist - treat as non-error, since the caller
// wanted to delete the entity and it's already gone.
LOG.debug("Deletion request ignored for non-existent entity with guid " + guid);
}
}
}
if (deletionCandidates.isEmpty()) {
LOG.info("No deletion candidate entities were found for guids %s", guids);
}
EntityMutationResponse ret = deleteVertices(deletionCandidates);
// Notify the change listeners
entityChangeNotifier.onEntitiesMutated(ret);
return ret;
}
@Override
@GraphTransaction
public EntityMutationResponse deleteByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes)
throws AtlasBaseException {
if (MapUtils.isEmpty(uniqAttributes)) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, uniqAttributes.toString());
}
final AtlasVertex vertex = AtlasGraphUtilsV1.findByUniqueAttributes(entityType, uniqAttributes);
Collection<AtlasVertex> deletionCandidates = new ArrayList<>();
if (vertex != null) {
deletionCandidates.add(vertex);
} else {
if (LOG.isDebugEnabled()) {
// Entity does not exist - treat as non-error, since the caller
// wanted to delete the entity and it's already gone.
LOG.debug("Deletion request ignored for non-existent entity with uniqueAttributes " + uniqAttributes);
}
}
EntityMutationResponse ret = deleteVertices(deletionCandidates);
// Notify the change listeners
entityChangeNotifier.onEntitiesMutated(ret);
return ret;
}
@Override
@GraphTransaction
public void addClassifications(final String guid, final List<AtlasClassification> classifications) throws AtlasBaseException {
if (StringUtils.isEmpty(guid)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified");
}
if (CollectionUtils.isEmpty(classifications)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Adding classifications={} to entity={}", classifications, guid);
}
for (AtlasClassification classification : classifications) {
validateAndNormalize(classification);
}
// validate if entity, not already associated with classifications
validateEntityAssociations(guid, classifications);
EntityGraphMapper graphMapper = new EntityGraphMapper(deleteHandler, typeRegistry);
graphMapper.addClassifications(new EntityMutationContext(), guid, classifications);
// notify listeners on classification addition
entityChangeNotifier.onClassificationAddedToEntity(guid, classifications);
}
@Override
@GraphTransaction
public void addClassification(final List<String> guids, final AtlasClassification classification) throws AtlasBaseException {
if (CollectionUtils.isEmpty(guids)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified");
}
if (classification == null) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classification not specified");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Adding classification={} to entities={}", classification, guids);
}
EntityGraphMapper graphMapper = new EntityGraphMapper(deleteHandler, typeRegistry);
validateAndNormalize(classification);
List<AtlasClassification> classifications = Collections.singletonList(classification);
for (String guid : guids) {
// validate if entity, not already associated with classifications
validateEntityAssociations(guid, classifications);
graphMapper.addClassifications(new EntityMutationContext(), guid, classifications);
// notify listeners on classification addition
entityChangeNotifier.onClassificationAddedToEntity(guid, classifications);
}
}
@Override
@GraphTransaction
public void deleteClassifications(final String guid, final List<String> classificationNames) throws AtlasBaseException {
if (StringUtils.isEmpty(guid)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified");
}
if (CollectionUtils.isEmpty(classificationNames)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting classifications={} from entity={}", classificationNames, guid);
}
EntityGraphMapper entityGraphMapper = new EntityGraphMapper(deleteHandler, typeRegistry);
entityGraphMapper.deleteClassifications(guid, classificationNames);
// notify listeners on classification deletion
entityChangeNotifier.onClassificationDeletedFromEntity(guid, classificationNames);
}
@Override
@GraphTransaction
public List<AtlasClassification> getClassifications(String guid) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Getting classifications for entities={}", guid);
}
EntityGraphRetriever graphRetriever = new EntityGraphRetriever(typeRegistry);
return graphRetriever.getClassifications(guid);
}
@Override
@GraphTransaction
public AtlasClassification getClassification(String guid, String classificationName) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Getting classifications for entities={}", guid);
}
EntityGraphRetriever graphRetriever = new EntityGraphRetriever(typeRegistry);
return graphRetriever.getClassification(guid, classificationName);
}
private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws AtlasBaseException {
EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV1(typeRegistry, entityStream);
EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities();
EntityMutationContext context = new EntityMutationContext(discoveryContext);
for (String guid : discoveryContext.getReferencedGuids()) {
AtlasVertex vertex = discoveryContext.getResolvedEntityVertex(guid);
AtlasEntity entity = entityStream.getByGuid(guid);
if (entity != null) {
if (vertex != null) {
// entity would be null if guid is not in the stream but referenced by an entity in the stream
if (!isPartialUpdate) {
graphDiscoverer.validateAndNormalize(entity);
} else {
graphDiscoverer.validateAndNormalizeForUpdate(entity);
}
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
String guidVertex = AtlasGraphUtilsV1.getIdFromVertex(vertex);
if (!StringUtils.equals(guidVertex, guid)) { // if entity was found by unique attribute
entity.setGuid(guidVertex);
}
context.addUpdated(guid, entity, entityType, vertex);
} else {
graphDiscoverer.validateAndNormalize(entity);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
//Create vertices which do not exist in the repository
if ((entityStream instanceof EntityImportStream) && AtlasTypeUtil.isAssignedGuid(entity.getGuid())) {
vertex = entityGraphMapper.createVertexWithGuid(entity, entity.getGuid());
} else {
vertex = entityGraphMapper.createVertex(entity);
}
discoveryContext.addResolvedGuid(guid, vertex);
String generatedGuid = AtlasGraphUtilsV1.getIdFromVertex(vertex);
entity.setGuid(generatedGuid);
context.addCreated(guid, entity, entityType, vertex);
}
}
}
return context;
}
private EntityMutationResponse deleteVertices(Collection<AtlasVertex> deletionCandidates) throws AtlasBaseException {
EntityMutationResponse response = new EntityMutationResponse();
deleteHandler.deleteEntities(deletionCandidates);
RequestContextV1 req = RequestContextV1.get();
for (AtlasObjectId id : req.getDeletedEntityIds()) {
response.addEntity(DELETE, EntityGraphMapper.constructHeader(id));
}
for (AtlasObjectId id : req.getUpdatedEntityIds()) {
response.addEntity(UPDATE, EntityGraphMapper.constructHeader(id));
}
return response;
}
private void validateAndNormalize(AtlasClassification classification) throws AtlasBaseException {
AtlasClassificationType type = typeRegistry.getClassificationTypeByName(classification.getTypeName());
if (type == null) {
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classification.getTypeName());
}
List<String> messages = new ArrayList<>();
type.validateValue(classification, classification.getTypeName(), messages);
if (!messages.isEmpty()) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, messages);
}
type.getNormalizedValue(classification);
}
/**
* Validate if classification is not already associated with the entities
* @param guid unique entity id
* @param classifications list of classifications to be associated
*/
private void validateEntityAssociations(String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
List<String> entityClassifications = getClassificationNames(guid);
for (AtlasClassification classification : classifications) {
String newClassification = classification.getTypeName();
if (CollectionUtils.isNotEmpty(entityClassifications) && entityClassifications.contains(newClassification)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "entity: " + guid +
", already associated with classification: " + newClassification);
}
}
}
private List<String> getClassificationNames(String guid) throws AtlasBaseException {
List<String> ret = null;
List<AtlasClassification> classifications = getClassifications(guid);
if (CollectionUtils.isNotEmpty(classifications)) {
ret = new ArrayList<>();
for (AtlasClassification classification : classifications) {
ret.add(classification.getTypeName());
}
}
return ret;
}
}