blob: 608b4a3047f21b2a47e7444ac75657a2818f1bb4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.preprocessor;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import static org.apache.atlas.model.instance.AtlasObjectId.KEY_GUID;
import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.QNAME_SEP_CLUSTER_NAME;
import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.QNAME_SEP_ENTITY_NAME;
public class PreprocessorContext {
private static final Logger LOG = LoggerFactory.getLogger(PreprocessorContext.class);
public enum PreprocessAction { NONE, IGNORE, PRUNE }
private final AtlasKafkaMessage<HookNotification> kafkaMessage;
private final AtlasTypeRegistry typeRegistry;
private final AtlasEntitiesWithExtInfo entitiesWithExtInfo;
private final List<Pattern> hiveTablesToIgnore;
private final List<Pattern> hiveTablesToPrune;
private final Map<String, PreprocessAction> hiveTablesCache;
private final List<String> hiveDummyDatabasesToIgnore;
private final List<String> hiveDummyTablesToIgnore;
private final List<String> hiveTablePrefixesToIgnore;
private final boolean updateHiveProcessNameWithQualifiedName;
private final boolean hiveTypesRemoveOwnedRefAttrs;
private final boolean rdbmsTypesRemoveOwnedRefAttrs;
private final boolean isHivePreProcessEnabled;
private final Set<String> ignoredEntities = new HashSet<>();
private final Set<String> prunedEntities = new HashSet<>();
private final Set<String> referredEntitiesToMove = new HashSet<>();
private final Set<String> createdEntities = new HashSet<>();
private final Set<String> deletedEntities = new HashSet<>();
private final Map<String, String> guidAssignments = new HashMap<>();
private List<AtlasEntity> postUpdateEntities = null;
public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs, boolean updateHiveProcessNameWithQualifiedName) {
this.kafkaMessage = kafkaMessage;
this.typeRegistry = typeRegistry;
this.hiveTablesToIgnore = hiveTablesToIgnore;
this.hiveTablesToPrune = hiveTablesToPrune;
this.hiveTablesCache = hiveTablesCache;
this.hiveDummyDatabasesToIgnore = hiveDummyDatabasesToIgnore;
this.hiveDummyTablesToIgnore = hiveDummyTablesToIgnore;
this.hiveTablePrefixesToIgnore = hiveTablePrefixesToIgnore;
this.hiveTypesRemoveOwnedRefAttrs = hiveTypesRemoveOwnedRefAttrs;
this.rdbmsTypesRemoveOwnedRefAttrs = rdbmsTypesRemoveOwnedRefAttrs;
this.updateHiveProcessNameWithQualifiedName = updateHiveProcessNameWithQualifiedName;
final HookNotification message = kafkaMessage.getMessage();
switch (message.getType()) {
case ENTITY_CREATE_V2:
entitiesWithExtInfo = ((HookNotification.EntityCreateRequestV2) message).getEntities();
break;
case ENTITY_FULL_UPDATE_V2:
entitiesWithExtInfo = ((HookNotification.EntityUpdateRequestV2) message).getEntities();
break;
default:
entitiesWithExtInfo = null;
break;
}
this.isHivePreProcessEnabled = hiveTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty() || updateHiveProcessNameWithQualifiedName;
}
public AtlasKafkaMessage<HookNotification> getKafkaMessage() {
return kafkaMessage;
}
public long getKafkaMessageOffset() {
return kafkaMessage.getOffset();
}
public int getKafkaPartition() {
return kafkaMessage.getPartition();
}
public boolean updateHiveProcessNameWithQualifiedName() { return updateHiveProcessNameWithQualifiedName; }
public boolean getHiveTypesRemoveOwnedRefAttrs() { return hiveTypesRemoveOwnedRefAttrs; }
public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return rdbmsTypesRemoveOwnedRefAttrs; }
public boolean isHivePreprocessEnabled() {
return isHivePreProcessEnabled;
}
public List<AtlasEntity> getEntities() {
return entitiesWithExtInfo != null ? entitiesWithExtInfo.getEntities() : null;
}
public Map<String, AtlasEntity> getReferredEntities() {
return entitiesWithExtInfo != null ? entitiesWithExtInfo.getReferredEntities() : null;
}
public AtlasEntity getEntity(String guid) {
return entitiesWithExtInfo != null && guid != null ? entitiesWithExtInfo.getEntity(guid) : null;
}
public AtlasEntity removeReferredEntity(String guid) {
Map<String, AtlasEntity> referredEntities = getReferredEntities();
return referredEntities != null && guid != null ? referredEntities.remove(guid) : null;
}
public Set<String> getIgnoredEntities() { return ignoredEntities; }
public Set<String> getPrunedEntities() { return prunedEntities; }
public Set<String> getReferredEntitiesToMove() { return referredEntitiesToMove; }
public Set<String> getCreatedEntities() { return createdEntities; }
public Set<String> getDeletedEntities() { return deletedEntities; }
public Map<String, String> getGuidAssignments() { return guidAssignments; }
public List<AtlasEntity> getPostUpdateEntities() { return postUpdateEntities; }
public PreprocessAction getPreprocessActionForHiveDb(String dbName) {
PreprocessAction ret = PreprocessAction.NONE;
if (dbName != null) {
for (String dummyDbName : hiveDummyDatabasesToIgnore) {
if (StringUtils.equalsIgnoreCase(dbName, dummyDbName)) {
ret = PreprocessAction.IGNORE;
break;
}
}
}
return ret;
}
public PreprocessAction getPreprocessActionForHiveTable(String qualifiedName) {
PreprocessAction ret = PreprocessAction.NONE;
if (qualifiedName != null) {
if (CollectionUtils.isNotEmpty(hiveTablesToIgnore) || CollectionUtils.isNotEmpty(hiveTablesToPrune)) {
ret = hiveTablesCache.get(qualifiedName);
if (ret == null) {
if (isMatch(qualifiedName, hiveTablesToIgnore)) {
ret = PreprocessAction.IGNORE;
} else if (isMatch(qualifiedName, hiveTablesToPrune)) {
ret = PreprocessAction.PRUNE;
} else {
ret = PreprocessAction.NONE;
}
hiveTablesCache.put(qualifiedName, ret);
}
}
if (ret != PreprocessAction.IGNORE && (CollectionUtils.isNotEmpty(hiveDummyTablesToIgnore) || CollectionUtils.isNotEmpty(hiveTablePrefixesToIgnore))) {
String tblName = getHiveTableNameFromQualifiedName(qualifiedName);
if (tblName != null) {
for (String dummyTblName : hiveDummyTablesToIgnore) {
if (StringUtils.equalsIgnoreCase(tblName, dummyTblName)) {
ret = PreprocessAction.IGNORE;
break;
}
}
if (ret != PreprocessAction.IGNORE) {
for (String tableNamePrefix : hiveTablePrefixesToIgnore) {
if (StringUtils.startsWithIgnoreCase(tblName, tableNamePrefix)) {
ret = PreprocessAction.IGNORE;
break;
}
}
}
}
if (ret != PreprocessAction.IGNORE && CollectionUtils.isNotEmpty(hiveDummyDatabasesToIgnore)) {
String dbName = getHiveDbNameFromQualifiedName(qualifiedName);
if (dbName != null) {
for (String dummyDbName : hiveDummyDatabasesToIgnore) {
if (StringUtils.equalsIgnoreCase(dbName, dummyDbName)) {
ret = PreprocessAction.IGNORE;
break;
}
}
}
}
}
}
return ret;
}
public boolean isIgnoredEntity(String guid) {
return guid != null ? ignoredEntities.contains(guid) : false;
}
public boolean isPrunedEntity(String guid) {
return guid != null ? prunedEntities.contains(guid) : false;
}
public void addToIgnoredEntities(AtlasEntity entity) {
if (!ignoredEntities.contains(entity.getGuid())) {
ignoredEntities.add(entity.getGuid());
LOG.info("ignored entity: typeName={}, qualifiedName={}. topic-offset={}, partition={}", entity.getTypeName(), EntityPreprocessor.getQualifiedName(entity), getKafkaMessageOffset(), getKafkaPartition());
}
}
public void addToPrunedEntities(AtlasEntity entity) {
if (!prunedEntities.contains(entity.getGuid())) {
prunedEntities.add(entity.getGuid());
LOG.info("pruned entity: typeName={}, qualifiedName={} topic-offset={}, partition={}", entity.getTypeName(), EntityPreprocessor.getQualifiedName(entity), getKafkaMessageOffset(), getKafkaPartition());
}
}
public void addToIgnoredEntities(String guid) {
if (guid != null) {
ignoredEntities.add(guid);
}
}
public void addToPrunedEntities(String guid) {
if (guid != null) {
prunedEntities.add(guid);
}
}
public void addToReferredEntitiesToMove(String guid) {
if (guid != null) {
referredEntitiesToMove.add(guid);
}
}
public void addToReferredEntitiesToMove(Collection<String> guids) {
if (guids != null) {
for (String guid : guids) {
addToReferredEntitiesToMove(guid);
}
}
}
public void addToIgnoredEntities(Object obj) {
collectGuids(obj, ignoredEntities);
}
public void addToPrunedEntities(Object obj) {
collectGuids(obj, prunedEntities);
}
public void removeRefAttributeAndRegisterToMove(AtlasEntity entity, String attrName, String relationshipType, String refAttrName) {
Object attrVal = entity.removeAttribute(attrName);
if (attrVal != null) {
AtlasRelatedObjectId entityId = null;
Set<String> guids = new HashSet<>();
collectGuids(attrVal, guids);
// removed attrVal might have elements removed (e.g. removed column); to handle this case register the entity for partial update
addToPostUpdate(entity, attrName, attrVal);
for (String guid : guids) {
AtlasEntity refEntity = getEntity(guid);
if (refEntity != null) {
Object refAttr = null;
if (refEntity.hasRelationshipAttribute(refAttrName)) {
refAttr = refEntity.getRelationshipAttribute(refAttrName);
} else if (refEntity.hasAttribute(refAttrName)) {
refAttr = refEntity.getAttribute(refAttrName);
} else {
if (entityId == null) {
entityId = AtlasTypeUtil.toAtlasRelatedObjectId(entity, typeRegistry);
}
refAttr = entityId;
}
if (refAttr != null) {
refAttr = setRelationshipType(refAttr, relationshipType);
}
if (refAttr != null) {
refEntity.setRelationshipAttribute(refAttrName, refAttr);
}
addToReferredEntitiesToMove(guid);
}
}
}
}
public void moveRegisteredReferredEntities() {
List<AtlasEntity> entities = getEntities();
Map<String, AtlasEntity> referredEntities = getReferredEntities();
if (entities != null && referredEntities != null && !referredEntitiesToMove.isEmpty()) {
AtlasEntity firstEntity = entities.isEmpty() ? null : entities.get(0);
for (String guid : referredEntitiesToMove) {
AtlasEntity entity = referredEntities.remove(guid);
if (entity != null) {
entities.add(entity);
if (LOG.isDebugEnabled()) {
LOG.debug("moved referred entity: typeName={}, qualifiedName={}. topic-offset={}, partition={}", entity.getTypeName(), EntityPreprocessor.getQualifiedName(entity), kafkaMessage.getOffset(), kafkaMessage.getPartition());
}
}
}
if (firstEntity != null) {
LOG.info("moved {} referred-entities to end of entities-list (firstEntity:typeName={}, qualifiedName={}). topic-offset={}, partition={}", referredEntitiesToMove.size(), firstEntity.getTypeName(), EntityPreprocessor.getQualifiedName(firstEntity), kafkaMessage.getOffset(), kafkaMessage.getPartition());
} else {
LOG.info("moved {} referred-entities to entities-list. topic-offset={}, partition={}", referredEntitiesToMove.size(), kafkaMessage.getOffset(), kafkaMessage.getPartition());
}
referredEntitiesToMove.clear();
}
}
public void prepareForPostUpdate() {
if (postUpdateEntities != null) {
ListIterator<AtlasEntity> iter = postUpdateEntities.listIterator();
while (iter.hasNext()) {
AtlasEntity entity = iter.next();
String assignedGuid = getAssignedGuid(entity.getGuid());
// no need to perform partial-update for entities that are created/deleted while processing this message
if (createdEntities.contains(assignedGuid) || deletedEntities.contains(assignedGuid)) {
iter.remove();
} else {
entity.setGuid(assignedGuid);
if (entity.getAttributes() != null) {
setAssignedGuids(entity.getAttributes().values());
}
if (entity.getRelationshipAttributes() != null) {
setAssignedGuids(entity.getRelationshipAttributes().values());
}
}
}
}
}
public String getHiveTableNameFromQualifiedName(String qualifiedName) {
String ret = null;
int idxStart = qualifiedName.indexOf(QNAME_SEP_ENTITY_NAME) + 1;
if (idxStart != 0 && qualifiedName.length() > idxStart) {
int idxEnd = qualifiedName.indexOf(QNAME_SEP_CLUSTER_NAME, idxStart);
if (idxEnd != -1) {
ret = qualifiedName.substring(idxStart, idxEnd);
}
}
return ret;
}
public String getHiveDbNameFromQualifiedName(String qualifiedName) {
String ret = null;
int idxEnd = qualifiedName.indexOf(QNAME_SEP_ENTITY_NAME); // db.table@cluster, db.table.column@cluster
if (idxEnd == -1) {
idxEnd = qualifiedName.indexOf(QNAME_SEP_CLUSTER_NAME); // db@cluster
}
if (idxEnd != -1) {
ret = qualifiedName.substring(0, idxEnd);
}
return ret;
}
public String getTypeName(Object obj) {
Object ret = null;
if (obj instanceof AtlasObjectId) {
ret = ((AtlasObjectId) obj).getTypeName();
} else if (obj instanceof Map) {
ret = ((Map) obj).get(AtlasObjectId.KEY_TYPENAME);
} else if (obj instanceof AtlasEntity) {
ret = ((AtlasEntity) obj).getTypeName();
} else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) {
ret = ((AtlasEntity.AtlasEntityWithExtInfo) obj).getEntity().getTypeName();
}
return ret != null ? ret.toString() : null;
}
public String getGuid(Object obj) {
Object ret = null;
if (obj instanceof AtlasObjectId) {
ret = ((AtlasObjectId) obj).getGuid();
} else if (obj instanceof Map) {
ret = ((Map) obj).get(KEY_GUID);
} else if (obj instanceof AtlasEntity) {
ret = ((AtlasEntity) obj).getGuid();
} else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) {
ret = ((AtlasEntity.AtlasEntityWithExtInfo) obj).getEntity().getGuid();
}
return ret != null ? ret.toString() : null;
}
public void collectGuids(Object obj, Set<String> guids) {
if (obj != null) {
if (obj instanceof Collection) {
Collection objList = (Collection) obj;
for (Object objElem : objList) {
collectGuids(objElem, guids);
}
} else {
collectGuid(obj, guids);
}
}
}
public void collectGuid(Object obj, Set<String> guids) {
String guid = getGuid(obj);
if (guid != null) {
guids.add(guid);
}
}
private boolean isMatch(String name, List<Pattern> patterns) {
boolean ret = false;
for (Pattern p : patterns) {
if (p.matcher(name).matches()) {
ret = true;
break;
}
}
return ret;
}
private AtlasRelatedObjectId setRelationshipType(Object attr, String relationshipType) {
AtlasRelatedObjectId ret = null;
if (attr instanceof AtlasRelatedObjectId) {
ret = (AtlasRelatedObjectId) attr;
} else if (attr instanceof AtlasObjectId) {
ret = new AtlasRelatedObjectId((AtlasObjectId) attr);
} else if (attr instanceof Map) {
ret = new AtlasRelatedObjectId((Map) attr);
}
if (ret != null) {
ret.setRelationshipType(relationshipType);
}
return ret;
}
private String getAssignedGuid(String guid) {
String ret = guidAssignments.get(guid);
return ret != null ? ret : guid;
}
private void setAssignedGuids(Object obj) {
if (obj != null) {
if (obj instanceof Collection) {
Collection objList = (Collection) obj;
for (Object objElem : objList) {
setAssignedGuids(objElem);
}
} else {
setAssignedGuid(obj);
}
}
}
private void setAssignedGuid(Object obj) {
if (obj instanceof AtlasRelatedObjectId) {
AtlasRelatedObjectId objId = (AtlasRelatedObjectId) obj;
objId.setGuid(getAssignedGuid(objId.getGuid()));
} else if (obj instanceof AtlasObjectId) {
AtlasObjectId objId = (AtlasObjectId) obj;
objId.setGuid(getAssignedGuid(objId.getGuid()));
} else if (obj instanceof Map) {
Map objId = (Map) obj;
Object guid = objId.get(KEY_GUID);
if (guid != null) {
objId.put(KEY_GUID, getAssignedGuid(guid.toString()));
}
}
}
private void addToPostUpdate(AtlasEntity entity, String attrName, Object attrVal) {
if (LOG.isDebugEnabled()) {
LOG.debug("addToPostUpdate(guid={}, entityType={}, attrName={}", entity.getGuid(), entity.getTypeName(), attrName);
}
AtlasEntity partialEntity = null;
if (postUpdateEntities == null) {
postUpdateEntities = new ArrayList<>();
}
for (AtlasEntity existing : postUpdateEntities) {
if (StringUtils.equals(entity.getGuid(), existing.getGuid())) {
partialEntity = existing;
break;
}
}
if (partialEntity == null) {
partialEntity = new AtlasEntity(entity.getTypeName(), attrName, attrVal);
partialEntity.setGuid(entity.getGuid());
postUpdateEntities.add(partialEntity);
} else {
partialEntity.setAttribute(attrName, attrVal);
}
}
}