IMPALA-9257: Last event id should be advanced if all events are skipped

Events processor implements a filtering method which skips certain
unnecessary events (eg. events on blacklisted dbs and tables).
However, if the received batch has all the events which are filtered
out, it fails to update its lastSyncedEventId. This causes unnecessary
logs being printed in catalog and the same event batch being fetched
repeatedly.

Testing:
Modified existing test to compare event id after events on blacklisted
dbs and tables.

Change-Id: I7f94c1a8e8c221f504262d5591cda8c3a25c0c32
Reviewed-on: http://gerrit.cloudera.org:8080/14916
Reviewed-by: Anurag Mantripragada <anurag@cloudera.com>
Reviewed-by: Quanlong Huang <huangquanlong@gmail.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 22c1237..ac50562 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -584,19 +584,23 @@
   /**
    * Process the given list of notification events. Useful for tests which provide a list
    * of events
-   *
-   * @return the last Notification event which was processed.
    */
   @VisibleForTesting
   protected void processEvents(List<NotificationEvent> events)
       throws MetastoreNotificationException {
     NotificationEvent lastProcessedEvent = null;
+    // update the events received metric before returning
+    metrics_.getMeter(EVENTS_RECEIVED_METRIC).mark(events.size());
+    if (events.isEmpty()) return;
     final Timer.Context context =
         metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).time();
-    metrics_.getMeter(EVENTS_RECEIVED_METRIC).mark(events.size());
     try {
       List<MetastoreEvent> filteredEvents =
           metastoreEventFactory_.getFilteredEvents(events);
+      if (filteredEvents.isEmpty()) {
+        lastSyncedEventId_.set(events.get(events.size() - 1).getEventId());
+        return;
+      }
       for (MetastoreEvent event : filteredEvents) {
         // synchronizing each event processing reduces the scope of the lock so the a
         // potential reset() during event processing is not blocked for longer than
diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
index 0b7103b..7d67901 100644
--- a/tests/custom_cluster/test_event_processing.py
+++ b/tests/custom_cluster/test_event_processing.py
@@ -52,6 +52,7 @@
     event processor does not error out
     """
     try:
+      event_id_before = EventProcessorUtils.get_last_synced_event_id()
       self.run_stmt_in_hive("create database testBlackListedDb")
       self.run_stmt_in_hive("create table testBlackListedDb.testtbl (id int)")
       self.run_stmt_in_hive(
@@ -62,6 +63,7 @@
       # wait until all the events generated above are processed
       EventProcessorUtils.wait_for_event_processing(self.hive_client)
       assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
+      assert EventProcessorUtils.get_last_synced_event_id() > event_id_before
     finally:
       self.run_stmt_in_hive("drop database testBlackListedDb cascade")
       self.run_stmt_in_hive("drop table functional_parquet.testBlackListedTbl")