IMPALA-9052: Events processor should skip blacklisted database and table

IMPALA-8797 introduced a configuration which can blacklist certain
database and tables so that catalog never loads them. This is useful to
skip the Hive's internal book-keeping tables which are unreadable by
Impala (like Information schema and sys tables). However, if there are
events generated on such tables, the events processor goes into error
state because the database does not exist.

This patch adds support for ignoring blacklisted database and tables in
the events processor.

Testing: 1. Added a new test case which makes sure that events processor
is not in error state after receiving events on blacklisted tables and
dbs.
2. Ran MetastoreEventsProcessorTest

Change-Id: Ic5a53b722e6225e9cad9954f447f821c2c677e60
Reviewed-on: http://gerrit.cloudera.org:8080/14457
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 9909e9d..0827cd6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -211,14 +211,24 @@
       for (NotificationEvent event : events) {
         metastoreEvents.add(get(event));
       }
-      Iterator<MetastoreEvent> it = metastoreEvents.iterator();
       // filter out the create events which has a corresponding drop event later
       int sizeBefore = metastoreEvents.size();
       int numFilteredEvents = 0;
       int i = 0;
       while (i < metastoreEvents.size()) {
         MetastoreEvent currentEvent = metastoreEvents.get(i);
-        if (currentEvent.isRemovedAfter(metastoreEvents.subList(i + 1,
+        String eventDb = currentEvent.getDbName();
+        String eventTbl = currentEvent.getTableName();
+        // if the event is on blacklisted db or table we should filter it out
+        if (catalog_.isBlacklistedDb(eventDb) || (eventTbl != null && catalog_
+            .isBlacklistedTable(eventDb, eventTbl))) {
+          String blacklistedObject = eventTbl != null ? new TableName(eventDb,
+              eventTbl).toString() : eventDb;
+          LOG.info(currentEvent.debugString("Filtering out this event since it is on a "
+              + "blacklisted database or table %s", blacklistedObject));
+          metastoreEvents.remove(i);
+          numFilteredEvents++;
+        } else if (currentEvent.isRemovedAfter(metastoreEvents.subList(i + 1,
             metastoreEvents.size()))) {
           LOG.info(currentEvent.debugString("Filtering out this event since the object is "
               + "either removed or renamed later in the event stream"));
@@ -300,6 +310,10 @@
       this.metrics_ = metrics;
     }
 
+    public String getDbName() { return dbName_; }
+
+    public String getTableName() { return tblName_; }
+
     /**
      * Process this event if it is enabled based on the flags on this object
      *
diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
index 9605381..432d421 100644
--- a/tests/custom_cluster/test_event_processing.py
+++ b/tests/custom_cluster/test_event_processing.py
@@ -41,6 +41,32 @@
   PROCESSING_TIMEOUT_S = 10
 
   @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--blacklisted_dbs=testBlackListedDb "
+                 "--blacklisted_tables=functional_parquet.testBlackListedTbl",
+    catalogd_args="--blacklisted_dbs=testBlackListedDb "
+                  "--blacklisted_tables=functional_parquet.testBlackListedTbl "
+                  "--hms_event_polling_interval_s=1")
+  def test_events_on_blacklisted_objects(self):
+    """Executes hive queries on blacklisted database and tables and makes sure that
+    event processor does not error out
+    """
+    try:
+      self.run_stmt_in_hive("create database testBlackListedDb")
+      self.run_stmt_in_hive("create table testBlackListedDb.testtbl (id int)")
+      self.run_stmt_in_hive(
+        "create table functional_parquet.testBlackListedTbl (id int, val string)"
+        " partitioned by (part int) stored as parquet")
+      self.run_stmt_in_hive(
+        "alter table functional_parquet.testBlackListedTbl add partition (part=1)")
+      # wait until all the events generated above are processed
+      EventProcessorUtils.wait_for_event_processing(self.hive_client)
+      assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
+    finally:
+      self.run_stmt_in_hive("drop database testBlackListedDb cascade")
+      self.run_stmt_in_hive("drop table functional_parquet.testBlackListedTbl")
+
+  @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=2")
   @SkipIfHive2.acid
   def test_insert_events_transactional(self):