ATLAS-2198: fix for Hive Hook OOM for large notification messages
Signed-off-by: Madhan Neethiraj <madhan@apache.org>
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index aca5645..f815773 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -59,6 +59,7 @@
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
@@ -698,7 +699,8 @@
}
colLineageProcessInstances.add(0, processReferenceable);
entities.addAll(colLineageProcessInstances);
- event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), new ArrayList<>(entities)));
+
+ addEntityUpdateNotificationMessagess(event, entities);
} else {
LOG.info("Skipped query {} since it has no getInputs() or resulting getOutputs()", event.getQueryStr());
}
@@ -711,6 +713,13 @@
}
}
+ private void addEntityUpdateNotificationMessagess(final HiveEventContext event, final Collection<Referenceable> entities) {
+ // process each entity as separate message to avoid running into OOM errors
+ for (Referenceable entity : entities) {
+ event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), entity));
+ }
+ }
+
private <T extends Entity> void processHiveEntity(HiveMetaStoreBridge dgiBridge, HiveEventContext event, T entity, Set<String> dataSetsProcessed,
SortedMap<T, Referenceable> dataSets, Set<Referenceable> entities) throws AtlasHookException {
try {
@@ -801,7 +810,8 @@
entities.addAll(tables.values());
entities.add(processReferenceable);
- event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), entities));
+
+ addEntityUpdateNotificationMessagess(event, entities);
}
}