blob: 8ca8c9a0bbce7d7062bcbf5e19e91f48d628fc97 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.audit;
import org.apache.atlas.EntityAuditEvent.EntityAuditAction;
import org.apache.atlas.RequestContext;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.EntityChangeListenerV2;
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_ADD;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_DELETE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_UPDATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_DELETE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_CREATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_DELETE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_UPDATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_ADD;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_DELETE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_UPDATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.TERM_ADD;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.TERM_DELETE;
@Component
public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
private static final Logger LOG = LoggerFactory.getLogger(EntityAuditListenerV2.class);
private final EntityAuditRepository auditRepository;
private final AtlasTypeRegistry typeRegistry;
private final AtlasInstanceConverter instanceConverter;
@Inject
public EntityAuditListenerV2(EntityAuditRepository auditRepository, AtlasTypeRegistry typeRegistry, AtlasInstanceConverter instanceConverter) {
this.auditRepository = auditRepository;
this.typeRegistry = typeRegistry;
this.instanceConverter = instanceConverter;
}
@Override
public void onEntitiesAdded(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasEntity entity : entities) {
EntityAuditEventV2 event = createEvent(entity, isImport ? ENTITY_IMPORT_CREATE : ENTITY_CREATE);
events.add(event);
}
auditRepository.putEventsV2(events);
}
@Override
public void onEntitiesUpdated(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasEntity entity : entities) {
EntityAuditEventV2 event = createEvent(entity, isImport ? ENTITY_IMPORT_UPDATE : ENTITY_UPDATE);
events.add(event);
}
auditRepository.putEventsV2(events);
}
@Override
public void onEntitiesDeleted(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasEntity entity : entities) {
EntityAuditEventV2 event = createEvent(entity, isImport ? ENTITY_IMPORT_DELETE : ENTITY_DELETE, "Deleted entity");
events.add(event);
}
auditRepository.putEventsV2(events);
}
@Override
public void onClassificationsAdded(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasClassification classification : classifications) {
if (entity.getGuid().equals(classification.getEntityGuid())) {
events.add(createEvent(entity, CLASSIFICATION_ADD, "Added classification: " + AtlasType.toJson(classification)));
} else {
events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + AtlasType.toJson(classification)));
}
}
auditRepository.putEventsV2(events);
}
}
@Override
public void onClassificationsUpdated(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
List<EntityAuditEventV2> events = new ArrayList<>();
String guid = entity.getGuid();
for (AtlasClassification classification : classifications) {
if (guid.equals(classification.getEntityGuid())) {
events.add(createEvent(entity, CLASSIFICATION_UPDATE, "Updated classification: " + AtlasType.toJson(classification)));
} else {
if (isPropagatedClassificationAdded(guid, classification)) {
events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + AtlasType.toJson(classification)));
} else if (isPropagatedClassificationDeleted(guid, classification)) {
events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + classification.getTypeName()));
} else {
events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_UPDATE, "Updated propagated classification: " + AtlasType.toJson(classification)));
}
}
}
auditRepository.putEventsV2(events);
}
}
@Override
public void onClassificationsDeleted(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasClassification classification : classifications) {
if (StringUtils.equals(entity.getGuid(), classification.getEntityGuid())) {
events.add(createEvent(entity, CLASSIFICATION_DELETE, "Deleted classification: " + classification.getTypeName()));
} else {
events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + classification.getTypeName()));
}
}
auditRepository.putEventsV2(events);
}
}
@Override
public void onTermAdded(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) throws AtlasBaseException {
if (term != null && CollectionUtils.isNotEmpty(entities)) {
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasRelatedObjectId relatedObjectId : entities) {
AtlasEntity entity = instanceConverter.getAndCacheEntity(relatedObjectId.getGuid());
if (entity != null) {
events.add(createEvent(entity, TERM_ADD, "Added term: " + term.toAuditString()));
}
}
auditRepository.putEventsV2(events);
}
}
@Override
public void onTermDeleted(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) throws AtlasBaseException {
if (term != null && CollectionUtils.isNotEmpty(entities)) {
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasRelatedObjectId relatedObjectId : entities) {
AtlasEntity entity = instanceConverter.getAndCacheEntity(relatedObjectId.getGuid());
if (entity != null) {
events.add(createEvent(entity, TERM_DELETE, "Deleted term: " + term.toAuditString()));
}
}
auditRepository.putEventsV2(events);
}
}
private EntityAuditEventV2 createEvent(AtlasEntity entity, EntityAuditActionV2 action, String details) {
return new EntityAuditEventV2(entity.getGuid(), RequestContext.get().getRequestTime(),
RequestContext.get().getUser(), action, details, entity);
}
private EntityAuditEventV2 createEvent(AtlasEntity entity, EntityAuditActionV2 action) {
String detail = getAuditEventDetail(entity, action);
return createEvent(entity, action, detail);
}
private String getAuditEventDetail(AtlasEntity entity, EntityAuditActionV2 action) {
Map<String, Object> prunedAttributes = pruneEntityAttributesForAudit(entity);
String auditPrefix = getV2AuditPrefix(action);
String auditString = auditPrefix + AtlasType.toJson(entity);
byte[] auditBytes = auditString.getBytes(StandardCharsets.UTF_8);
long auditSize = auditBytes != null ? auditBytes.length : 0;
long auditMaxSize = auditRepository.repositoryMaxSize();
if (auditMaxSize >= 0 && auditSize > auditMaxSize) { // don't store attributes in audit
LOG.warn("audit record too long: entityType={}, guid={}, size={}; maxSize={}. entity attribute values not stored in audit",
entity.getTypeName(), entity.getGuid(), auditSize, auditMaxSize);
Map<String, Object> attrValues = entity.getAttributes();
Map<String, Object> relAttrValues = entity.getRelationshipAttributes();
entity.setAttributes(null);
entity.setRelationshipAttributes(null);
auditString = auditPrefix + AtlasType.toJson(entity);
auditBytes = auditString.getBytes(StandardCharsets.UTF_8); // recheck auditString size
auditSize = auditBytes != null ? auditBytes.length : 0;
if (auditMaxSize >= 0 && auditSize > auditMaxSize) { // don't store classifications and meanings as well
LOG.warn("audit record still too long: entityType={}, guid={}, size={}; maxSize={}. audit will have only summary details",
entity.getTypeName(), entity.getGuid(), auditSize, auditMaxSize);
AtlasEntity shallowEntity = new AtlasEntity();
shallowEntity.setGuid(entity.getGuid());
shallowEntity.setTypeName(entity.getTypeName());
shallowEntity.setCreateTime(entity.getCreateTime());
shallowEntity.setUpdateTime(entity.getUpdateTime());
shallowEntity.setCreatedBy(entity.getCreatedBy());
shallowEntity.setUpdatedBy(entity.getUpdatedBy());
shallowEntity.setStatus(entity.getStatus());
shallowEntity.setVersion(entity.getVersion());
auditString = auditPrefix + AtlasType.toJson(shallowEntity);
}
entity.setAttributes(attrValues);
entity.setRelationshipAttributes(relAttrValues);
}
restoreEntityAttributes(entity, prunedAttributes);
return auditString;
}
private boolean isPropagatedClassificationAdded(String guid, AtlasClassification classification) {
Map<String, List<AtlasClassification>> addedPropagations = RequestContext.get().getAddedPropagations();
return hasPropagatedEntry(addedPropagations, guid, classification);
}
private boolean isPropagatedClassificationDeleted(String guid, AtlasClassification classification) {
Map<String, List<AtlasClassification>> removedPropagations = RequestContext.get().getRemovedPropagations();
return hasPropagatedEntry(removedPropagations, guid, classification);
}
private boolean hasPropagatedEntry(Map<String, List<AtlasClassification>> propagationsMap, String guid, AtlasClassification classification) {
boolean ret = false;
if (MapUtils.isNotEmpty(propagationsMap) && propagationsMap.containsKey(guid) && CollectionUtils.isNotEmpty(propagationsMap.get(guid))) {
List<AtlasClassification> classifications = propagationsMap.get(guid);
String classificationName = classification.getTypeName();
String entityGuid = classification.getEntityGuid();
for (AtlasClassification c : classifications) {
if (StringUtils.equals(c.getTypeName(), classificationName) && StringUtils.equals(c.getEntityGuid(), entityGuid)) {
ret = true;
break;
}
}
}
return ret;
}
private Map<String, Object> pruneEntityAttributesForAudit(AtlasEntity entity) {
Map<String, Object> ret = null;
Map<String, Object> entityAttributes = entity.getAttributes();
List<String> excludeAttributes = auditRepository.getAuditExcludeAttributes(entity.getTypeName());
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
if (CollectionUtils.isNotEmpty(excludeAttributes) && MapUtils.isNotEmpty(entityAttributes) && entityType != null) {
for (AtlasAttribute attribute : entityType.getAllAttributes().values()) {
String attrName = attribute.getName();
Object attrValue = entityAttributes.get(attrName);
if (excludeAttributes.contains(attrName)) {
if (ret == null) {
ret = new HashMap<>();
}
ret.put(attrName, attrValue);
entityAttributes.remove(attrName);
}
}
}
return ret;
}
private void restoreEntityAttributes(AtlasEntity entity, Map<String, Object> prunedAttributes) {
if (MapUtils.isEmpty(prunedAttributes)) {
return;
}
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
if (entityType != null && MapUtils.isNotEmpty(entityType.getAllAttributes())) {
for (AtlasAttribute attribute : entityType.getAllAttributes().values()) {
String attrName = attribute.getName();
if (prunedAttributes.containsKey(attrName)) {
entity.setAttribute(attrName, prunedAttributes.get(attrName));
}
}
}
}
private String getV1AuditPrefix(EntityAuditAction action) {
final String ret;
switch (action) {
case ENTITY_CREATE:
ret = "Created: ";
break;
case ENTITY_UPDATE:
ret = "Updated: ";
break;
case ENTITY_DELETE:
ret = "Deleted: ";
break;
case TAG_ADD:
ret = "Added classification: ";
break;
case TAG_DELETE:
ret = "Deleted classification: ";
break;
case TAG_UPDATE:
ret = "Updated classification: ";
break;
case ENTITY_IMPORT_CREATE:
ret = "Created by import: ";
break;
case ENTITY_IMPORT_UPDATE:
ret = "Updated by import: ";
break;
case ENTITY_IMPORT_DELETE:
ret = "Deleted by import: ";
break;
case TERM_ADD:
ret = "Added term: ";
break;
case TERM_DELETE:
ret = "Deleted term: ";
break;
default:
ret = "Unknown: ";
}
return ret;
}
private String getV2AuditPrefix(EntityAuditActionV2 action) {
final String ret;
switch (action) {
case ENTITY_CREATE:
ret = "Created: ";
break;
case ENTITY_UPDATE:
ret = "Updated: ";
break;
case ENTITY_DELETE:
ret = "Deleted: ";
break;
case CLASSIFICATION_ADD:
ret = "Added classification: ";
break;
case CLASSIFICATION_DELETE:
ret = "Deleted classification: ";
break;
case CLASSIFICATION_UPDATE:
ret = "Updated classification: ";
break;
case ENTITY_IMPORT_CREATE:
ret = "Created by import: ";
break;
case ENTITY_IMPORT_UPDATE:
ret = "Updated by import: ";
break;
case ENTITY_IMPORT_DELETE:
ret = "Deleted by import: ";
break;
case TERM_ADD:
ret = "Added term: ";
break;
case TERM_DELETE:
ret = "Deleted term: ";
break;
default:
ret = "Unknown: ";
}
return ret;
}
}