ATLAS-2198: fix for Hive Hook OOM for large notification messages

Signed-off-by: Madhan Neethiraj <madhan@apache.org>
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index aca5645..f815773 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -59,6 +59,7 @@
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
@@ -698,7 +699,8 @@
                     }
                     colLineageProcessInstances.add(0, processReferenceable);
                     entities.addAll(colLineageProcessInstances);
-                    event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), new ArrayList<>(entities)));
+
+                    addEntityUpdateNotificationMessagess(event, entities);
                 } else {
                     LOG.info("Skipped query {} since it has no getInputs() or resulting getOutputs()", event.getQueryStr());
                 }
@@ -711,6 +713,13 @@
         }
     }
 
+    private void addEntityUpdateNotificationMessagess(final HiveEventContext event, final Collection<Referenceable> entities) {
+        // process each entity as separate message to avoid running into OOM errors
+        for (Referenceable entity : entities) {
+            event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), entity));
+        }
+    }
+
     private  <T extends Entity> void processHiveEntity(HiveMetaStoreBridge dgiBridge, HiveEventContext event, T entity, Set<String> dataSetsProcessed,
         SortedMap<T, Referenceable> dataSets, Set<Referenceable> entities) throws AtlasHookException {
         try {
@@ -801,7 +810,8 @@
 
             entities.addAll(tables.values());
             entities.add(processReferenceable);
-            event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), entities));
+
+            addEntityUpdateNotificationMessagess(event, entities);
         }
     }