ATLAS-3279: avoid unncessary retrieval of entity-extended info while sending notifications
Change-Id: I82e0bba27010709c74cd98a93f8a9c617577535e
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
index caa6604..0f2b4bf 100644
--- a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
@@ -271,11 +271,11 @@
}
}
- private AtlasEntity getAndCacheEntity(String guid) throws AtlasBaseException {
+ public AtlasEntity getAndCacheEntity(String guid) throws AtlasBaseException {
return getAndCacheEntity(guid, true);
}
- private AtlasEntity getAndCacheEntity(String guid, boolean includeReferences) throws AtlasBaseException {
+ public AtlasEntity getAndCacheEntity(String guid, boolean includeReferences) throws AtlasBaseException {
RequestContext context = RequestContext.get();
AtlasEntity entity = context.getEntity(guid);
@@ -294,7 +294,7 @@
return entity;
}
- private AtlasEntityWithExtInfo getAndCacheEntityWithExtInfo(String guid) throws AtlasBaseException {
+ public AtlasEntityWithExtInfo getAndCacheEntityWithExtInfo(String guid) throws AtlasBaseException {
RequestContext context = RequestContext.get();
AtlasEntityWithExtInfo entityWithExtInfo = context.getEntityWithExtInfo(guid);
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
index 2e47a50..c910d9e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
@@ -92,7 +92,7 @@
}
public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException {
- if (CollectionUtils.isEmpty(entityChangeListeners) || instanceConverter == null) {
+ if (CollectionUtils.isEmpty(entityChangeListeners)) {
return;
}
@@ -118,7 +118,7 @@
}
public void notifyRelationshipMutation(AtlasRelationship relationship, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException {
- if (CollectionUtils.isEmpty(entityChangeListeners) || instanceConverter == null) {
+ if (CollectionUtils.isEmpty(entityChangeListeners)) {
return;
}
@@ -145,75 +145,76 @@
} else {
updateFullTextMapping(entity.getGuid(), addedClassifications);
- Referenceable entityRef = toReferenceable(entity.getGuid());
- List<Struct> traits = toStruct(addedClassifications);
+ if (instanceConverter != null) {
+ Referenceable entityRef = toReferenceable(entity.getGuid());
+ List<Struct> traits = toStruct(addedClassifications);
- if (entity == null || CollectionUtils.isEmpty(traits)) {
- return;
- }
+ if (entity == null || CollectionUtils.isEmpty(traits)) {
+ return;
+ }
- for (EntityChangeListener listener : entityChangeListeners) {
- try {
- listener.onTraitsAdded(entityRef, traits);
- } catch (AtlasException e) {
- throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitAdd");
+ for (EntityChangeListener listener : entityChangeListeners) {
+ try {
+ listener.onTraitsAdded(entityRef, traits);
+ } catch (AtlasException e) {
+ throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitAdd");
+ }
}
}
}
}
public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
- if (isV2EntityNotificationEnabled) {
- doFullTextMapping(entity.getGuid());
+ doFullTextMapping(entity.getGuid());
+ if (isV2EntityNotificationEnabled) {
for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onClassificationsUpdated(entity, updatedClassifications);
}
} else {
- doFullTextMapping(entity.getGuid());
+ if (instanceConverter != null) {
+ Referenceable entityRef = toReferenceable(entity.getGuid());
+ List<Struct> traits = toStruct(updatedClassifications);
- Referenceable entityRef = toReferenceable(entity.getGuid());
- List<Struct> traits = toStruct(updatedClassifications);
+ if (entityRef == null || CollectionUtils.isEmpty(traits)) {
+ return;
+ }
- if (entityRef == null || CollectionUtils.isEmpty(traits)) {
- return;
- }
-
- for (EntityChangeListener listener : entityChangeListeners) {
- try {
- listener.onTraitsUpdated(entityRef, traits);
- } catch (AtlasException e) {
- throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitUpdate");
+ for (EntityChangeListener listener : entityChangeListeners) {
+ try {
+ listener.onTraitsUpdated(entityRef, traits);
+ } catch (AtlasException e) {
+ throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitUpdate");
+ }
}
}
}
}
public void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
- if (isV2EntityNotificationEnabled) {
- doFullTextMapping(entity.getGuid());
+ doFullTextMapping(entity.getGuid());
+ if (isV2EntityNotificationEnabled) {
for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onClassificationsDeleted(entity, deletedClassifications);
}
} else {
- doFullTextMapping(entity.getGuid());
+ if (instanceConverter != null) {
+ Referenceable entityRef = toReferenceable(entity.getGuid());
+ List<Struct> traits = toStruct(deletedClassifications);
- Referenceable entityRef = toReferenceable(entity.getGuid());
- List<Struct> traits = toStruct(deletedClassifications);
+ if (entityRef == null || CollectionUtils.isEmpty(deletedClassifications)) {
+ return;
+ }
- if (entityRef == null || CollectionUtils.isEmpty(deletedClassifications)) {
- return;
- }
-
- for (EntityChangeListener listener : entityChangeListeners) {
- try {
- listener.onTraitsDeleted(entityRef, traits);
- } catch (AtlasException e) {
- throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitDelete");
+ for (EntityChangeListener listener : entityChangeListeners) {
+ try {
+ listener.onTraitsDeleted(entityRef, traits);
+ } catch (AtlasException e) {
+ throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitDelete");
+ }
}
}
-
}
}
@@ -223,7 +224,7 @@
for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onTermAdded(term, entityIds);
}
- } else {
+ } else if (instanceConverter != null) {
List<Referenceable> entityRefs = toReferenceables(entityIds);
for (EntityChangeListener listener : entityChangeListeners) {
@@ -242,7 +243,7 @@
for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onTermDeleted(term, entityIds);
}
- } else {
+ } else if (instanceConverter != null) {
List<Referenceable> entityRefs = toReferenceables(entityIds);
for (EntityChangeListener listener : entityChangeListeners) {
@@ -277,7 +278,7 @@
continue;
}
- AtlasEntity entity = instanceConverter.getAndCacheEntity(guid);
+ AtlasEntity entity = fullTextMapperV2.getAndCacheEntity(guid);
if (entity == null) {
continue;
@@ -300,11 +301,15 @@
return;
}
+ MetricRecorder metric = RequestContext.get().startMetricRecord("notifyListeners");
+
if (isV2EntityNotificationEnabled) {
notifyV2Listeners(entityHeaders, operation, isImport);
} else {
notifyV1Listeners(entityHeaders, operation, isImport);
}
+
+ RequestContext.get().endMetricRecord(metric);
}
private void notifyRelationshipListeners(List<AtlasRelationship> relationships, EntityOperation operation, boolean isImport) throws AtlasBaseException {
@@ -322,24 +327,26 @@
private void notifyV1Listeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException {
- List<Referenceable> typedRefInsts = toReferenceables(entityHeaders, operation);
+ if (instanceConverter != null) {
+ List<Referenceable> typedRefInsts = toReferenceables(entityHeaders, operation);
- for (EntityChangeListener listener : entityChangeListeners) {
- try {
- switch (operation) {
- case CREATE:
- listener.onEntitiesAdded(typedRefInsts, isImport);
- break;
- case UPDATE:
- case PARTIAL_UPDATE:
- listener.onEntitiesUpdated(typedRefInsts, isImport);
- break;
- case DELETE:
- listener.onEntitiesDeleted(typedRefInsts, isImport);
- break;
+ for (EntityChangeListener listener : entityChangeListeners) {
+ try {
+ switch (operation) {
+ case CREATE:
+ listener.onEntitiesAdded(typedRefInsts, isImport);
+ break;
+ case UPDATE:
+ case PARTIAL_UPDATE:
+ listener.onEntitiesUpdated(typedRefInsts, isImport);
+ break;
+ case DELETE:
+ listener.onEntitiesDeleted(typedRefInsts, isImport);
+ break;
+ }
+ } catch (AtlasException e) {
+ throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), operation.toString());
}
- } catch (AtlasException e) {
- throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), operation.toString());
}
}
}
@@ -383,17 +390,19 @@
}
}
- private List<Referenceable> toReferenceables(List<AtlasEntityHeader> entityHeaders, EntityOperation operation) throws AtlasBaseException {
+ private List<Referenceable> toReferenceables(List<AtlasEntityHeader> entityHeaders, EntityOperation operation) throws AtlasBaseException {
List<Referenceable> ret = new ArrayList<>(entityHeaders.size());
- // delete notifications don't need all attributes. Hence the special handling for delete operation
- if (operation == EntityOperation.DELETE) {
- for (AtlasEntityHeader entityHeader : entityHeaders) {
- ret.add(new Referenceable(entityHeader.getGuid(), entityHeader.getTypeName(), entityHeader.getAttributes()));
- }
- } else {
- for (AtlasEntityHeader entityHeader : entityHeaders) {
- ret.add(toReferenceable(entityHeader.getGuid()));
+ if (instanceConverter != null) {
+ // delete notifications don't need all attributes. Hence the special handling for delete operation
+ if (operation == EntityOperation.DELETE) {
+ for (AtlasEntityHeader entityHeader : entityHeaders) {
+ ret.add(new Referenceable(entityHeader.getGuid(), entityHeader.getTypeName(), entityHeader.getAttributes()));
+ }
+ } else {
+ for (AtlasEntityHeader entityHeader : entityHeaders) {
+ ret.add(toReferenceable(entityHeader.getGuid()));
+ }
}
}
@@ -403,7 +412,7 @@
private List<Referenceable> toReferenceables(List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
List<Referenceable> ret = new ArrayList<>();
- if (CollectionUtils.isNotEmpty(entityIds)) {
+ if (instanceConverter != null && CollectionUtils.isNotEmpty(entityIds)) {
for (AtlasRelatedObjectId relatedObjectId : entityIds) {
String entityGuid = relatedObjectId.getGuid();
@@ -417,17 +426,17 @@
private Referenceable toReferenceable(String entityId) throws AtlasBaseException {
Referenceable ret = null;
- if (StringUtils.isNotEmpty(entityId)) {
+ if (instanceConverter != null && StringUtils.isNotEmpty(entityId)) {
ret = instanceConverter.getReferenceable(entityId);
}
return ret;
}
- private List<Struct> toStruct(List<AtlasClassification> classifications) throws AtlasBaseException {
+ private List<Struct> toStruct(List<AtlasClassification> classifications) throws AtlasBaseException {
List<Struct> ret = null;
- if (classifications != null) {
+ if (instanceConverter != null && classifications != null) {
ret = new ArrayList<>(classifications.size());
for (AtlasClassification classification : classifications) {
@@ -468,7 +477,7 @@
} else {
String entityGuid = entityHeader.getGuid();
- entity = instanceConverter.getAndCacheEntity(entityGuid);
+ entity = fullTextMapperV2.getAndCacheEntity(entityGuid);
}
if (entity != null) {
@@ -545,6 +554,10 @@
}
private void doFullTextMapping(String guid) {
+ if(AtlasRepositoryConfiguration.isFreeTextSearchEnabled() || !AtlasRepositoryConfiguration.isFullTextSearchEnabled()) {
+ return;
+ }
+
AtlasEntityHeader entityHeader = new AtlasEntityHeader();
entityHeader.setGuid(guid);