IMPALA-9101: Add support for detecting self-events on partition events

This commit redoes some of the self-event detection logic, specifically
for the partition events. Before the patch, the self-event identifiers
for a partition were stored at a table level when generating the
partition events. This was problematic since unlike ADD_PARTITION and
DROP_PARTITION event, ALTER_PARTITION event is generated one per
partition. Due to this if there are multiple ALTER_PARTITION events
generated, only the first event is identified as a self-event and the
rest of the events are processed. This patch fixes this by adding the
self-event identifiers to each partition so that when the event is later
received, each ALTER_PARTITION uses the state stored in HdfsPartition to
valuate the self-events. The patch makes sure that the event processor
takes a table lock during self-event evaluation to avoid races with
other parts of the code which try to modify the table at the same time.

Additionally, this patch also changes the event processor to refresh a
loaded table (incomplete tables are not refreshed) when a ALTER_TABLE
event is received instead of invalidating the table. This makes the
events processor consistent with respect to all the other event types.
In future, we should add a flag to choose the behavior preference
(prefer invalidate or refresh).

Also, this patch fixes the following related issues:
1. Self-event logic was not triggered for alter database events when
user modifies the comment on the database.
2. In case of queries like "alter table add if not exists partition...",
the partition is not added since its pre-existing. The self-event
identifiers should not be added in such cases since no event is expected
from such queries.
3. Changed wait_for_event_processing test util method in
EventProcessorUtils to use a more deterministic way to determine if the
catalog updates have propogated to impalad instead of waiting for a
random duration of time.  This also speeds up the event processing tests
significantly.

Testing Done:
1. Added a e2e self-events test which runs multiple impala
queries and makes sure that the event is skips processing.
2. Ran MetastoreEventsProcessorTest
3. Ran core tests on CDH and CDP builds.

Change-Id: I9b4148f6be0f9f946c8ad8f314d64b095731744c
Reviewed-on: http://gerrit.cloudera.org:8080/14799
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 2b603f1..0670c3c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -51,7 +51,9 @@
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.events.ExternalEventsProcessor;
 import org.apache.impala.catalog.events.MetastoreEventsProcessor;
+import org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus;
 import org.apache.impala.catalog.events.NoOpEventProcessor;
+import org.apache.impala.catalog.events.SelfEventContext;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
@@ -390,8 +392,10 @@
     return metastoreEventProcessor_;
   }
 
-  public boolean isExternalEventProcessingEnabled() {
-    return !(metastoreEventProcessor_ instanceof NoOpEventProcessor);
+  public boolean isEventProcessingActive() {
+    return metastoreEventProcessor_ instanceof MetastoreEventsProcessor
+        && EventProcessorStatus.ACTIVE
+        .equals(((MetastoreEventsProcessor) metastoreEventProcessor_).getStatus());
   }
 
   /**
@@ -785,7 +789,7 @@
    */
   public List<Long> getInFlightVersionsForEvents(String dbName, String tblName)
       throws DatabaseNotFoundException, TableNotFoundException {
-    Preconditions.checkState(isExternalEventProcessingEnabled(),
+    Preconditions.checkState(isEventProcessingActive(),
         "Event processing should be enabled before calling this method");
     List<Long> result = Collections.EMPTY_LIST;
     versionLock_.readLock().lock();
@@ -811,36 +815,73 @@
   }
 
   /**
-   * Removes a given version number from the catalog database/table's list of versions
-   * for in-flight events.
-   * If tblName is null, removes version number from database.
-   * If tblName not null and table is not incomplete, removes version number from table
-   * Applicable only when external event processing is enabled.
-   * @param dbName database name
-   * @param tblName table name
+   * Evaluates if the information from an event (serviceId and versionNumber) matches to
+   * the catalog object. If there is match, the in-flight version for that object is
+   * removed and method returns true. If it does not match, returns false
+   * @param ctx self context which provides all the information needed to
+   * evaluate if this is a self-event or not
+   * @return true if given event information evaluates to a self-event, false otherwise
    */
-  public void removeFromInFlightVersionsForEvents(String dbName, String tblName,
-      long versionNumber) throws DatabaseNotFoundException, TableNotFoundException {
-    Preconditions.checkState(isExternalEventProcessingEnabled(),
+  public boolean evaluateSelfEvent(SelfEventContext ctx)
+      throws CatalogException {
+    Preconditions.checkState(isEventProcessingActive(),
         "Event processing should be enabled when calling this method");
-    versionLock_.writeLock().lock();
-    try {
-      Db db = getDb(dbName);
-      if (db == null) return;
-      if (tblName == null) {
-        db.removeFromVersionsForInflightEvents(versionNumber);
-        return;
-      }
-      Table tbl = getTable(dbName, tblName);
-      if (tbl == null) {
-        throw new TableNotFoundException(
-            String.format("Table %s not found", new TableName(dbName, tblName)));
-      }
-      if (tbl instanceof IncompleteTable) return;
-      tbl.removeFromVersionsForInflightEvents(versionNumber);
-    } finally {
-      versionLock_.writeLock().unlock();
+    long versionNumber = ctx.getVersionNumberFromEvent();
+    String serviceIdFromEvent = ctx.getServiceIdFromEvent();
+    // no version info or service id in the event
+    if (versionNumber == -1 || serviceIdFromEvent.isEmpty()) return false;
+    // if the service id from event doesn't match with our service id this is not a
+    // self-event
+    if (!getCatalogServiceId().equals(serviceIdFromEvent)) return false;
+    Db db = getDb(ctx.getDbName());
+    if (db == null) {
+      throw new DatabaseNotFoundException("Database " + ctx.getDbName() + " not found");
     }
+    // if the given tblName is null we look db's in-flight events
+    if (ctx.getTblName() == null) {
+      return db.removeFromVersionsForInflightEvents(versionNumber);
+    }
+    Table tbl = getTable(ctx.getDbName(), ctx.getTblName());
+    if (tbl == null) {
+      throw new TableNotFoundException(
+          String.format("Table %s.%s not found", ctx.getDbName(), ctx.getTblName()));
+    }
+    // we should acquire the table lock so that we wait for any other updates
+    // happening to this table at the same time
+    if (!tryLockTable(tbl)) {
+      throw new CatalogException(String.format("Error during self-event evaluation "
+          + "for table %s due to lock contention", tbl.getFullName()));
+    }
+    versionLock_.writeLock().unlock();
+    try {
+      List<List<TPartitionKeyValue>> partitionKeyValues = ctx.getPartitionKeyValues();
+      // if the partitionKeyValues is null, we look for tbl's in-flight events
+      if (partitionKeyValues == null) {
+        return tbl.removeFromVersionsForInflightEvents(versionNumber);
+      }
+      if (tbl instanceof HdfsTable) {
+        List<String> failingPartitions = new ArrayList<>();
+        for (List<TPartitionKeyValue> partitionKeyValue : partitionKeyValues) {
+          HdfsPartition hdfsPartition =
+              ((HdfsTable) tbl).getPartitionFromThriftPartitionSpec(partitionKeyValue);
+          if (hdfsPartition == null || !hdfsPartition
+              .removeFromVersionsForInflightEvents(versionNumber)) {
+            // even if this is an error condition we should not bail out early since we
+            // should clean up the self-event state on the rest of the partitions
+            String partName = HdfsTable.constructPartitionName(partitionKeyValue);
+            if (hdfsPartition == null) {
+              LOG.warn(String.format("Partition %s not found during self-event "
+                + "evaluation for the table %s", partName, tbl.getFullName()));
+            }
+            failingPartitions.add(partName);
+          }
+        }
+        return failingPartitions.isEmpty();
+      }
+    } finally {
+      tbl.getLock().unlock();
+    }
+    return false;
   }
 
   /**
@@ -851,14 +892,12 @@
    * @param versionNumber version number to be added
    */
   public void addVersionsForInflightEvents(Table tbl, long versionNumber) {
-    if (!isExternalEventProcessingEnabled()) return;
-    versionLock_.writeLock().lock();
-    try {
-      if (tbl instanceof IncompleteTable) return;
-      tbl.addToVersionsForInflightEvents(versionNumber);
-    } finally {
-      versionLock_.writeLock().unlock();
-    }
+    if (!isEventProcessingActive()) return;
+    // we generally don't take locks on Incomplete tables since they are atomically
+    // replaced during load
+    Preconditions.checkState(
+        tbl instanceof IncompleteTable || tbl.getLock().isHeldByCurrentThread());
+    tbl.addToVersionsForInflightEvents(versionNumber);
   }
 
   /**
@@ -869,13 +908,8 @@
    * @param versionNumber version number to be added
    */
   public void addVersionsForInflightEvents(Db db, long versionNumber) {
-    if (!isExternalEventProcessingEnabled()) return;
-    versionLock_.writeLock().lock();
-    try {
-      db.addToVersionsForInflightEvents(versionNumber);
-    } finally {
-      versionLock_.writeLock().unlock();
-    }
+    if (!isEventProcessingActive()) return;
+    db.addToVersionsForInflightEvents(versionNumber);
   }
 
   /**
@@ -2214,17 +2248,26 @@
   }
 
   /**
-   * Refresh partition if it exists. Returns true if reload of the partition succeeds,
-   * false otherwise.
-   * @throws CatalogException if partition reload is unsuccessful.
+   * Refresh partition if it exists.
+   *
+   * @return true if partition was reloaded, else false.
+   * @throws CatalogException if partition reload threw an error.
    * @throws DatabaseNotFoundException if Db doesn't exist.
+   * @throws TableNotFoundException if table doesn't exist.
+   * @throws TableNotLoadedException if table is not loaded in Catalog.
    */
   public boolean reloadPartitionIfExists(String dbName, String tblName,
       List<TPartitionKeyValue> tPartSpec, String reason) throws CatalogException {
     Table table = getTable(dbName, tblName);
-    if (table == null || table instanceof IncompleteTable) return false;
-    reloadPartition(table, tPartSpec, reason);
-    return true;
+    if (table == null) {
+      throw new TableNotFoundException(dbName + "." + tblName + " not found");
+    }
+    if (table instanceof IncompleteTable) {
+      throw new TableNotLoadedException(dbName + "." + tblName + " is not loaded");
+    }
+    Reference<Boolean> wasPartitionRefreshed = new Reference<>(false);
+    reloadPartition(table, tPartSpec, wasPartitionRefreshed, reason);
+    return wasPartitionRefreshed.getRef();
   }
 
   /**
@@ -2555,7 +2598,8 @@
    * the partition metadata was reloaded.
    */
   public TCatalogObject reloadPartition(Table tbl,
-      List<TPartitionKeyValue> partitionSpec, String reason) throws CatalogException {
+      List<TPartitionKeyValue> partitionSpec,
+      Reference<Boolean> wasPartitionReloaded, String reason) throws CatalogException {
     if (!tryLockTable(tbl)) {
       throw new CatalogException(String.format("Error reloading partition of table %s " +
           "due to lock contention", tbl.getFullName()));
@@ -2564,6 +2608,7 @@
       long newCatalogVersion = incrementAndGetCatalogVersion();
       versionLock_.writeLock().unlock();
       HdfsTable hdfsTable = (HdfsTable) tbl;
+      wasPartitionReloaded.setRef(false);
       HdfsPartition hdfsPartition = hdfsTable
           .getPartitionFromThriftPartitionSpec(partitionSpec);
       // Retrieve partition name from existing partition or construct it from
@@ -2584,6 +2629,12 @@
           if (hdfsPartition != null) {
             hdfsTable.dropPartition(partitionSpec);
             hdfsTable.setCatalogVersion(newCatalogVersion);
+            // non-existing partition was dropped from catalog, so we mark it as refreshed
+            wasPartitionReloaded.setRef(true);
+          } else {
+            LOG.info(String.format("Partition metadata for %s was not refreshed since "
+                    + "it does not exist in metastore anymore",
+                hdfsTable.getFullName() + " " + partitionName));
           }
           return hdfsTable.toTCatalogObject();
         } catch (Exception e) {
@@ -2593,6 +2644,7 @@
         hdfsTable.reloadPartition(msClient.getHiveClient(), hdfsPartition, hmsPartition);
       }
       hdfsTable.setCatalogVersion(newCatalogVersion);
+      wasPartitionReloaded.setRef(true);
       LOG.info(String.format("Refreshed partition metadata: %s %s",
           hdfsTable.getFullName(), partitionName));
       return hdfsTable.toTCatalogObject();
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java
index 455e0f0..4bfd070 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -29,6 +29,7 @@
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.impala.analysis.ColumnDef;
 import org.apache.impala.analysis.KuduPartitionParam;
+import org.apache.impala.catalog.events.InFlightEvents;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TCatalogObject;
@@ -96,13 +97,8 @@
   // (e.g. can't drop it, can't add tables to it, etc).
   private boolean isSystemDb_ = false;
 
-  // maximum number of catalog versions to store for in-flight events for this database
-  private static final int MAX_NUMBER_OF_INFLIGHT_EVENTS = 10;
-
-  // FIFO list of versions for all the in-flight metastore events in this database
-  // This queue can only grow up to MAX_NUMBER_OF_INFLIGHT_EVENTS size. Anything which
-  // is attempted to be added to this list when its at maximum capacity is ignored
-  private final LinkedList<Long> versionsForInflightEvents_ = new LinkedList<>();
+  // tracks the in-flight metastore events for this db
+  private final InFlightEvents inFlightEvents_ = new InFlightEvents();
 
   public Db(String name, org.apache.hadoop.hive.metastore.api.Database msDb) {
     setMetastoreDb(name, msDb);
@@ -504,7 +500,7 @@
    * Gets the current list of versions for in-flight events for this database
    */
   public List<Long> getVersionsForInflightEvents() {
-    return Collections.unmodifiableList(versionsForInflightEvents_);
+    return inFlightEvents_.getAll();
   }
 
   /**
@@ -513,29 +509,22 @@
    * @return true if version was successfully removed, false if didn't exist
    */
   public boolean removeFromVersionsForInflightEvents(long versionNumber) {
-    return versionsForInflightEvents_.remove(versionNumber);
+    return inFlightEvents_.remove(versionNumber);
   }
 
   /**
    * Adds a version number to the collection of versions for in-flight events. If the
    * collection is already at the max size defined by
-   * <code>MAX_NUMBER_OF_INFLIGHT_EVENTS</code>, then it ignores the given version and
-   * does not add it
+   * <code>InflightEvents.MAX_NUMBER_OF_INFLIGHT_EVENTS</code>, then it ignores the
+   * given version and does not add it
    * @param versionNumber version number to add
-   * @return True if version number was added, false if the collection is at its max
-   * capacity
    */
-  public boolean addToVersionsForInflightEvents(long versionNumber) {
-    if (versionsForInflightEvents_.size() >= MAX_NUMBER_OF_INFLIGHT_EVENTS) {
-      LOG.warn(String.format("Number of versions to be stored for database %s is at "
-              + " its max capacity %d. Ignoring add request for version number %d. This "
-              + "could cause unnecessary database invalidation when the event is "
-              + "processed",
-          getName(), MAX_NUMBER_OF_INFLIGHT_EVENTS, versionNumber));
-      return false;
+  public void addToVersionsForInflightEvents(long versionNumber) {
+    if (!inFlightEvents_.add(versionNumber)) {
+      LOG.warn(String.format("Could not add version %s to the list of in-flight "
+          + "events. This could cause unnecessary database %s invalidation when the "
+          + "event is processed", versionNumber, getName()));
     }
-    versionsForInflightEvents_.add(versionNumber);
-    return true;
   }
 
   @Override // FeDb
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 8832ce8..d0664df 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -37,6 +37,8 @@
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.PartitionKeyValue;
+import org.apache.impala.catalog.events.InFlightEvents;
+import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
@@ -618,6 +620,8 @@
   // -1 means writeId_ is irrelevant(not supported).
   private long writeId_ = -1L;
 
+  private final InFlightEvents inFlightEvents_ = new InFlightEvents(20);
+
   private HdfsPartition(HdfsTable table,
       org.apache.hadoop.hive.metastore.api.Partition msPartition,
       List<LiteralExpr> partitionKeyValues,
@@ -643,6 +647,7 @@
     } else {
       hmsParameters_ = new HashMap<>();
     }
+    addInflightVersionsFromParameters();
     extractAndCompressPartStats();
     // Intern parameters after removing the incremental stats
     hmsParameters_ = CatalogInterners.internParameters(hmsParameters_);
@@ -846,6 +851,48 @@
   }
 
   /**
+   * Gets the current list of versions for in-flight events for this partition
+   */
+  public List<Long> getVersionsForInflightEvents() {
+    return inFlightEvents_.getAll();
+  }
+
+  /**
+   * Removes a given version from the in-flight events
+   * @param versionNumber version number to remove
+   * @return true if the versionNumber was removed, false if it didn't exist
+   */
+  public boolean removeFromVersionsForInflightEvents(long versionNumber) {
+    return inFlightEvents_.remove(versionNumber);
+  }
+
+  /**
+   * Adds a version number to the in-flight events of this partition
+   * @param versionNumber version number to add
+   */
+  public void addToVersionsForInflightEvents(long versionNumber) {
+    if (!inFlightEvents_.add(versionNumber)) {
+      LOG.warn(String.format("Could not add %s version to the partition %s of table %s. "
+          + "This could cause unnecessary refresh of the partition when the event is"
+          + "received by the Events processor.", versionNumber, getPartitionName(),
+          getTable().getFullName()));
+    }
+  }
+
+  /**
+   * Adds the version from the given Partition parameters. No-op if the parameters does
+   * not contain the <code>MetastoreEventPropertyKey.CATALOG_VERSION</code>
+   */
+  private void addInflightVersionsFromParameters() {
+    Preconditions.checkNotNull(hmsParameters_);
+    Preconditions.checkState(inFlightEvents_.size() == 0);
+    if (!hmsParameters_.containsKey(MetastoreEventPropertyKey.CATALOG_VERSION.getKey())) {
+      return;
+    }
+    inFlightEvents_.add(Long.parseLong(
+            hmsParameters_.get(MetastoreEventPropertyKey.CATALOG_VERSION.getKey())));
+  }
+  /**
    * Marks this partition's metadata as "dirty" indicating that changes have been
    * made and this partition's metadata should not be reused during the next
    * incremental metadata refresh.
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index ed3b5ad..7df8d18 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -20,7 +20,6 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -37,6 +36,7 @@
 import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.impala.analysis.TableName;
+import org.apache.impala.catalog.events.InFlightEvents;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.Metrics;
 import org.apache.impala.common.Pair;
@@ -141,13 +141,9 @@
   // TODO(todd) this should probably be a ValidWriteIdList in memory instead of a String.
   protected String validWriteIds_ = null;
 
-  // maximum number of catalog versions to store for in-flight events for this table
-  private static final int MAX_NUMBER_OF_INFLIGHT_EVENTS = 10;
-
-  // FIFO list of versions for all the in-flight metastore events in this table
-  // This queue can only grow up to MAX_NUMBER_OF_INFLIGHT_EVENTS size. Anything which
-  // is attempted to be added to this list when its at maximum capacity is ignored
-  private final LinkedList<Long> versionsForInflightEvents_ = new LinkedList<>();
+  // tracks the in-flight metastore events for this table. Used by Events processor to
+  // avoid unnecessary refresh when the event is received
+  private final InFlightEvents inFlightEvents = new InFlightEvents();
 
   // Table metrics. These metrics are applicable to all table types. Each subclass of
   // Table can define additional metrics specific to that table type.
@@ -810,7 +806,7 @@
    * Gets the current list of versions for in-flight events for this table
    */
   public List<Long> getVersionsForInflightEvents() {
-    return Collections.unmodifiableList(versionsForInflightEvents_);
+    return inFlightEvents.getAll();
   }
 
   /**
@@ -819,7 +815,7 @@
    * @return true if version was successfully removed, false if didn't exist
    */
   public boolean removeFromVersionsForInflightEvents(long versionNumber) {
-    return versionsForInflightEvents_.remove(versionNumber);
+    return inFlightEvents.remove(versionNumber);
   }
 
   /**
@@ -831,16 +827,12 @@
    * @return True if version number was added, false if the collection is at its max
    * capacity
    */
-  public boolean addToVersionsForInflightEvents(long versionNumber) {
-    if (versionsForInflightEvents_.size() == MAX_NUMBER_OF_INFLIGHT_EVENTS) {
-      LOG.warn(String.format("Number of versions to be stored for table %s is at "
-              + " its max capacity %d. Ignoring add request for version number %d. This "
-              + "could cause unnecessary table invalidation when the event is processed",
-          getFullName(), MAX_NUMBER_OF_INFLIGHT_EVENTS, versionNumber));
-      return false;
+  public void addToVersionsForInflightEvents(long versionNumber) {
+    if (!inFlightEvents.add(versionNumber)) {
+      LOG.warn(String.format("Could not add %s version to the table %s. This could "
+          + "cause unnecessary refresh of the table when the event is received by the "
+              + "Events processor.", versionNumber, getFullName()));
     }
-    versionsForInflightEvents_.add(versionNumber);
-    return true;
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/TableNotLoadedException.java b/fe/src/main/java/org/apache/impala/catalog/TableNotLoadedException.java
new file mode 100644
index 0000000..cc8f78c
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/TableNotLoadedException.java
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+/**
+ * Thrown when table is not loaded in Catalog.
+ */
+public class TableNotLoadedException extends CatalogException {
+
+  public TableNotLoadedException(String msg) {super(msg);}
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/InFlightEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/InFlightEvents.java
new file mode 100644
index 0000000..1aed6d1
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/events/InFlightEvents.java
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.events;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.util.LinkedList;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is used to keep track of the in-flight events after a DDL operation is
+ * completed. The MetastoreEventsProcessor uses this information to determine if a
+ * received event is self-generated or not. Not thread-safe.
+ */
+public class InFlightEvents {
+
+  // maximum number of catalog versions to store for in-flight events for this table
+  private static final int DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS = 10;
+
+  private static final Logger LOG = LoggerFactory.getLogger(InFlightEvents.class);
+  // FIFO list of versions for all the in-flight metastore events in this table
+  // This queue can only grow up to MAX_NUMBER_OF_INFLIGHT_EVENTS size. Anything which
+  // is attempted to be added to this list when its at maximum capacity is ignored
+  private final LinkedList<Long> versionsForInflightEvents_ = new LinkedList<>();
+
+  // maximum number of versions to store
+  private final int capacity_;
+
+  public InFlightEvents() {
+    this.capacity_ = DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS;
+  }
+
+  public InFlightEvents(int capacity) {
+    Preconditions.checkState(capacity > 0);
+    this.capacity_ = capacity;
+  }
+
+  /**
+   * Gets the current list of versions for in-flight events for this table
+   */
+  public List<Long> getAll() {
+    return ImmutableList.copyOf(versionsForInflightEvents_);
+  }
+
+  /**
+   * Removes a given version from the collection of version numbers for in-flight
+   * events.
+   *
+   * @param versionNumber version number to remove from the collection
+   * @return true if the version was found and successfully removed, false
+   * otherwise
+   */
+  public boolean remove(long versionNumber) {
+    return versionsForInflightEvents_.remove(versionNumber);
+  }
+
+  /**
+   * Adds a version number to the collection of versions for in-flight events. If the
+   * collection is already at the max size defined by
+   * <code>MAX_NUMBER_OF_INFLIGHT_EVENTS</code>, then it ignores the given version and
+   * does not add it
+   *
+   * @param versionNumber version number to add
+   * @return True if version number was added, false if the collection is at its max
+   * capacity
+   */
+  public boolean add(long versionNumber) {
+    if (versionsForInflightEvents_.size() == capacity_) {
+      LOG.warn(String.format("Number of versions to be stored is at "
+              + " its max capacity %d. Ignoring add request for version number %d.",
+          DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS, versionNumber));
+      return false;
+    }
+    versionsForInflightEvents_.add(versionNumber);
+    return true;
+  }
+
+  public int size() {
+    return versionsForInflightEvents_.size();
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index c8e1043..2be56e9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -23,8 +23,9 @@
 import com.google.common.collect.Lists;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
-import java.util.Iterator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Objects;
 import java.util.Map;
@@ -48,19 +49,19 @@
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.catalog.DatabaseNotFoundException;
 import org.apache.impala.catalog.Db;
+import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.Table;
-import org.apache.impala.catalog.TableNotFoundException;
 import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.catalog.TableNotFoundException;
+import org.apache.impala.catalog.TableNotLoadedException;
 import org.apache.impala.common.Metrics;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.Reference;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.AcidUtils;
-import org.apache.impala.util.ClassUtil;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
-import org.apache.hadoop.hive.common.FileUtils;
 
 /**
  * Main class which provides Metastore event objects for various event types. Also
@@ -167,18 +168,10 @@
         case CREATE_DATABASE: return new CreateDatabaseEvent(catalog_, metrics_, event);
         case DROP_DATABASE: return new DropDatabaseEvent(catalog_, metrics_, event);
         case ALTER_DATABASE: return new AlterDatabaseEvent(catalog_, metrics_, event);
-        case ADD_PARTITION:
-          // add partition events triggers invalidate table currently
-          return new AddPartitionEvent(catalog_, metrics_, event);
-        case DROP_PARTITION:
-          // drop partition events triggers invalidate table currently
-          return new DropPartitionEvent(catalog_, metrics_, event);
-        case ALTER_PARTITION:
-          // alter partition events triggers invalidate table currently
-          return new AlterPartitionEvent(catalog_, metrics_, event);
-        case INSERT:
-          // Insert events trigger refresh on a table/partition currently
-          return new InsertEvent(catalog_, metrics_, event);
+        case ADD_PARTITION: return new AddPartitionEvent(catalog_, metrics_, event);
+        case DROP_PARTITION: return new DropPartitionEvent(catalog_, metrics_, event);
+        case ALTER_PARTITION: return new AlterPartitionEvent(catalog_, metrics_, event);
+        case INSERT: return new InsertEvent(catalog_, metrics_, event);
         default:
           // ignore all the unknown events by creating a IgnoredEvent
           return new IgnoredEvent(catalog_, metrics_, event);
@@ -287,15 +280,6 @@
     // metrics registry so that events can add metrics
     protected final Metrics metrics_;
 
-    // version number from the event object parameters used for self-event detection
-    protected long versionNumberFromEvent_ = -1;
-    // service id from the event object parameters used for self-event detection
-    protected String serviceIdFromEvent_ = null;
-    // the list of versions which this catalog expects to be see in the
-    // self-generated events in order. Anything which is seen out of order of this list
-    // will be used to determine that this is not a self-event and table will be
-    // invalidated. See <code>isSelfEvent</code> for more details.
-    protected List<Long> pendingVersionNumbersFromCatalog_ = Collections.EMPTY_LIST;
 
     MetastoreEvent(CatalogServiceCatalog catalogServiceCatalog, Metrics metrics,
         NotificationEvent event) {
@@ -414,6 +398,8 @@
      */
     protected abstract boolean isEventProcessingDisabled();
 
+    protected abstract SelfEventContext getSelfEventContext();
+
     /**
      * This method detects if this event is self-generated or not (see class
      * documentation of <code>MetastoreEventProcessor</code> to understand what a
@@ -436,56 +422,24 @@
      * @return True if this event is a self-generated event. If the returned value is
      * true, this method also clears the version number from the catalog database/table.
      * Returns false if the version numbers or service id don't match
-     * @throws CatalogException in case of exceptions while removing the version number
-     * from the database/table or when reading the values of version list from catalog
-     * database/table
      */
-    protected boolean isSelfEvent() throws CatalogException {
-      initSelfEventIdentifiersFromEvent();
-      if (versionNumberFromEvent_ == -1 || pendingVersionNumbersFromCatalog_.isEmpty()) {
-        return false;
-      }
-
-      // first check if service id is a match, then check if the event version is what we
-      // expect in the list
-      if (catalog_.getCatalogServiceId().equals(serviceIdFromEvent_)
-          && pendingVersionNumbersFromCatalog_.get(0).equals(versionNumberFromEvent_)) {
-        // we see this version for the first time. This is a self-event
-        // remove this version number from the catalog so that next time we see this
-        // version we don't determine it wrongly as self-event
-        // TODO we should improve by atomically doing this check and invalidating the
-        // table to avoid any races. Currently, it is possible the some other thread
-        // in CatalogService changes the pendingVersionNumbersFromCatalog after do
-        // the check here. However, there are only two possible operations that can
-        // happen from outside with respect to this version list. Either some thread
-        // adds a new version to the list after we looked at it, or the table is
-        // invalidated. In both the cases, it is OK since the new version added is
-        // guaranteed to be greater than versionNumberFromEvent and if the table is
-        // invalidated, this operation (this whole event) becomes a no-op
-        catalog_.removeFromInFlightVersionsForEvents(
-            dbName_, tblName_, versionNumberFromEvent_);
-        metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS).inc();
-        return true;
+    protected boolean isSelfEvent() {
+      try {
+        if (catalog_.evaluateSelfEvent(getSelfEventContext())) {
+          metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS).inc();
+          return true;
+        }
+      } catch (CatalogException e) {
+        debugLog("Received exception {}. Ignoring self-event evaluation", e.getMessage());
       }
       return false;
     }
+  }
 
-    /**
-     * This method should be implemented by subclasses to initialize the values of
-     * self-event identifiers by parsing the event data. These identifiers are later
-     * used to determine if this event needs to be processed or not. See
-     * <code>isSelfEvent</code> method for details.
-     */
-    protected void initSelfEventIdentifiersFromEvent() {
-      throw new UnsupportedOperationException(
-          String.format("%s is not supported", ClassUtil.getMethodName()));
-    }
-
-    protected static String getStringProperty(
-        Map<String, String> params, String key, String defaultVal) {
-      if (params == null) return defaultVal;
-      return params.getOrDefault(key, defaultVal);
-    }
+  public static String getStringProperty(
+      Map<String, String> params, String key, String defaultVal) {
+    if (params == null) return defaultVal;
+    return params.getOrDefault(key, defaultVal);
   }
 
   /**
@@ -504,8 +458,13 @@
       super(catalogServiceCatalog, metrics, event);
       Preconditions.checkNotNull(dbName_, debugString("Database name cannot be null"));
       tblName_ = Preconditions.checkNotNull(event.getTableName());
-      debugLog("Creating event {} of type {} on table {}", eventId_, eventType_,
-          getFullyQualifiedTblName());
+      if (MetastoreEventType.OTHER.equals(eventType_)) {
+        debugLog("Creating event {} of type {} ({}) on table {}", eventId_, eventType_,
+            event.getEventType(), getFullyQualifiedTblName());
+      } else {
+        debugLog("Creating event {} of type {} on table {}", eventId_, eventType_,
+            getFullyQualifiedTblName());
+      }
     }
 
 
@@ -518,20 +477,6 @@
     }
 
     /**
-     * Util method to issue invalidate on a given table on the catalog. This method
-     * atomically invalidates the table if it exists in the catalog. No-op if the table
-     * does not exist
-     */
-    protected boolean invalidateCatalogTable() {
-      boolean tableInvalidated =
-          catalog_.invalidateTableIfExists(dbName_, tblName_) != null;
-      if (tableInvalidated) {
-        metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLE_INVALIDATES).inc();
-      }
-      return tableInvalidated;
-    }
-
-    /**
      * Checks if the table level property is set in the parameters of the table from the
      * event. If it is available, it takes precedence over database level flag for this
      * table. If the table level property is not set, returns the value from the database
@@ -604,34 +549,65 @@
       return tPartSpec;
     }
 
-    /**
-     * Util method to create a partition spec string out of a TPartitionKeyValue objects.
-     */
-    protected static String constructPartitionStringFromTPartitionSpec(
-        List<TPartitionKeyValue> tPartSpec) {
-      List<String> partitionCols = new ArrayList<>();
-      List<String> partitionVals = new ArrayList<>();
-      for (TPartitionKeyValue kv: tPartSpec) {
-        partitionCols.add(kv.getName());
-        partitionVals.add(kv.getValue());
-      }
-      String partString = FileUtils.makePartName(partitionCols, partitionVals);
-      return partString;
-    }
-
     /*
      * Helper function to initiate a table reload on Catalog. Re-throws the exception if
      * the catalog operation throws.
      */
-    protected void reloadTableFromCatalog(String operation) throws CatalogException {
-      if (!catalog_.reloadTableIfExists(dbName_, tblName_,
-              "Processing " + operation + " event from HMS")) {
-        debugLog("Automatic refresh on table {} failed as the table is not "
-            + "present either in catalog or metastore.", getFullyQualifiedTblName());
-      } else {
-        infoLog("Table {} has been refreshed after " + operation +".",
+    protected boolean reloadTableFromCatalog(String operation, boolean isTransactional)
+        throws CatalogException {
+      try {
+        if (!catalog_.reloadTableIfExists(dbName_, tblName_,
+            "Processing " + operation + " event from HMS")) {
+          debugLog("Automatic refresh on table {} failed as the table "
+              + "either does not exist anymore or is not in loaded state.",
+              getFullyQualifiedTblName());
+          return false;
+        }
+      } catch (DatabaseNotFoundException e) {
+        debugLog("Refresh table {} failed as "
+                + "the database was not present in the catalog.",
             getFullyQualifiedTblName());
+        return false;
       }
+      String tblStr = isTransactional ? "transactional table" : "table";
+      infoLog("Refreshed {} {}", tblStr, getFullyQualifiedTblName());
+      metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLE_REFRESHES).inc();
+      return true;
+    }
+
+    /**
+     * Refreshes a partition provided by given spec only if the table is loaded
+     * @param tPartSpec
+     * @param reason Event type which caused the refresh, used for logging by catalog
+     * @return false if the table or database did not exist or was not loaded, else
+     * returns true.
+     * @throws CatalogException
+     */
+    protected boolean reloadPartition(List<TPartitionKeyValue> tPartSpec, String reason)
+        throws CatalogException {
+      try {
+        boolean result = catalog_.reloadPartitionIfExists(dbName_,
+            tblName_, tPartSpec, reason);
+        if (!result) {
+          debugLog("partition {} on table {} was not refreshed since it does not exist "
+                  + "in catalog anymore", HdfsTable.constructPartitionName(tPartSpec),
+              getFullyQualifiedTblName());
+        } else {
+          metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_PARTITION_REFRESHES)
+              .inc();
+          infoLog("Table {} partition {} has been refreshed", getFullyQualifiedTblName(),
+              HdfsTable.constructPartitionName(tPartSpec));
+        }
+        return true;
+      } catch (TableNotLoadedException e) {
+          debugLog("Partition {} on table {} was not refreshed since it is not loaded",
+              HdfsTable.constructPartitionName(tPartSpec), getFullyQualifiedTblName());
+      } catch (DatabaseNotFoundException | TableNotFoundException e) {
+        debugLog("Refresh of table {} partition {} "
+                + "event failed as the database or table is not present in the catalog.",
+            getFullyQualifiedTblName(), HdfsTable.constructPartitionName(tPartSpec));
+      }
+      return false;
     }
   }
 
@@ -686,6 +662,12 @@
       }
     }
 
+    @Override
+    public SelfEventContext getSelfEventContext() {
+      throw new UnsupportedOperationException("Self-event evaluation is unnecessary for"
+          + " this event type");
+    }
+
     /**
      * If the table provided in the catalog does not exist in the catalog, this method
      * will create it. If the table in the catalog already exists, it relies of the
@@ -782,6 +764,12 @@
       }
     }
 
+    @Override
+    public SelfEventContext getSelfEventContext() {
+      throw new UnsupportedOperationException("Self-event evaluation is not implemented"
+          + " for insert event type");
+    }
+
     /**
      * Currently we do not check for self-events in Inserts. Existing self-events logic
      * cannot be used for insert events since firing insert event does not allow us to
@@ -815,27 +803,13 @@
         // Ignore event if table or database is not in catalog. Throw exception if
         // refresh fails. If the partition does not exist in metastore the reload
         // method below removes it from the catalog
-        if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
-            "processing partition-level INSERT event from HMS")) {
-          debugLog("Refresh of table {} partition {} after insert "
-                  + "event failed as the table is not present in the catalog.",
-              getFullyQualifiedTblName(), (tPartSpec));
-        } else {
-          infoLog("Table {} partition {} has been refreshed after insert.",
-              getFullyQualifiedTblName(),
-              constructPartitionStringFromTPartitionSpec(tPartSpec));
-        }
-      } catch (DatabaseNotFoundException e) {
-        debugLog("Refresh of table {} partition {} for insert "
-                + "event failed as the database is not present in the catalog.",
-            getFullyQualifiedTblName(),
-            constructPartitionStringFromTPartitionSpec(tPartSpec));
+        reloadPartition(tPartSpec, "INSERT");
       } catch (CatalogException e) {
         throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
                 + "partition on table {} partition {} failed. Event processing cannot "
-                + "continue. Issue and invalidate command to reset the event processor "
-                + "state.", getFullyQualifiedTblName(),
-            constructPartitionStringFromTPartitionSpec(tPartSpec)), e);
+                + "continue. Issue an invalidate metadata command to reset the event "
+                + "processor state.", getFullyQualifiedTblName(),
+            HdfsTable.constructPartitionName(tPartSpec)), e);
       }
     }
 
@@ -848,10 +822,7 @@
       try {
         // Ignore event if table or database is not in the catalog. Throw exception if
         // refresh fails.
-        reloadTableFromCatalog("table-level INSERT");
-      } catch (DatabaseNotFoundException e) {
-        debugLog("Automatic refresh of table {} insert failed as the "
-            + "database is not present in the catalog.", getFullyQualifiedTblName());
+        reloadTableFromCatalog("INSERT", false);
       } catch (CatalogException e) {
         if (e instanceof TableLoadingException &&
             e.getCause() instanceof NoSuchObjectException) {
@@ -871,7 +842,7 @@
   /**
    * MetastoreEvent for ALTER_TABLE event type
    */
-  public static class AlterTableEvent extends TableInvalidatingEvent {
+  public static class AlterTableEvent extends MetastoreTableEvent {
     protected org.apache.hadoop.hive.metastore.api.Table tableBefore_;
     // the table object after alter operation, as parsed from the NotificationEvent
     protected org.apache.hadoop.hive.metastore.api.Table tableAfter_;
@@ -915,32 +886,14 @@
     }
 
     @Override
-    protected void initSelfEventIdentifiersFromEvent() {
-      versionNumberFromEvent_ = Long.parseLong(
-          getStringProperty(tableAfter_.getParameters(),
-              MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1"));
-      serviceIdFromEvent_ =
-          getStringProperty(tableAfter_.getParameters(),
-              MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
-      try {
-        if (isRename_) {
-          //if this is rename event then identifiers will be in tableAfter
-          pendingVersionNumbersFromCatalog_ = catalog_
-              .getInFlightVersionsForEvents(tableAfter_.getDbName(),
-                  tableAfter_.getTableName());
-        } else {
-          pendingVersionNumbersFromCatalog_ = catalog_
-              .getInFlightVersionsForEvents(msTbl_.getDbName(), msTbl_.getTableName());
-        }
-      } catch (TableNotFoundException | DatabaseNotFoundException e) {
-        debugLog("Received exception {}. Ignoring self-event evaluation",
-            e.getMessage());
-      }
+    protected SelfEventContext getSelfEventContext() {
+      return new SelfEventContext(tableAfter_.getDbName(), tableAfter_.getTableName(),
+          tableAfter_.getParameters());
     }
 
     /**
      * If the ALTER_TABLE event is due a table rename, this method removes the old table
-     * and creates a new table with the new name. Else, this just issues a invalidate
+     * and creates a new table with the new name. Else, this just issues a refresh
      * table on the tblName from the event
      */
     @Override
@@ -948,8 +901,7 @@
       // Determine whether this is an event which we have already seen or if it is a new
       // event
       if (isSelfEvent()) {
-        infoLog("Not processing the event as it is a self-event or "
-            + "update is already present in catalog.");
+        infoLog("Not processing the event as it is a self-event");
         return;
       }
       // Ignore the event if this is a trivial event. See javadoc for
@@ -960,12 +912,12 @@
         return;
       }
       // in case of table level alters from external systems it is better to do a full
-      // invalidate  eg. this could be due to as simple as adding a new parameter or a
-      // full blown adding  or changing column type
-      // detect the special where a table is renamed
+      // refresh  eg. this could be due to as simple as adding a new parameter or a
+      // full blown adding or changing column type
+      // detect the special case where a table is renamed
       if (!isRename_) {
-        // table is not renamed, need to invalidate
-        if (!invalidateCatalogTable()) {
+        // table is not renamed, need to refresh the table if its loaded
+        if (!reloadTableFromCatalog("ALTER_TABLE", false)) {
           if (wasEventSyncTurnedOn()) {
             // we received this alter table event on a non-existing table. We also
             // detect that event sync was turned on in this event. This may mean that
@@ -980,11 +932,6 @@
                     + "continued further. Issue a invalidate metadata command to reset "
                     + "the event processing state", getFullyQualifiedTblName()));
           }
-          debugLog("Table {} does not need to be "
-                  + "invalidated since it does not exist anymore",
-              getFullyQualifiedTblName());
-        } else {
-          infoLog("Table {} is invalidated", getFullyQualifiedTblName());
         }
         return;
       }
@@ -1025,8 +972,7 @@
       return false;
     }
 
-    @Override
-    protected boolean canBeSkipped() {
+    private boolean canBeSkipped() {
       // Certain alter events just modify some parameters such as
       // "transient_lastDdlTime" in Hive. For eg: the alter table event generated
       // along with insert events. Check if the alter table event is such a trivial
@@ -1049,7 +995,7 @@
      * <code>MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC</code>. If the
      * parameter is unchanged, it doesn't
      * matter if you use the before or after table object here since the eventual action
-     * is going be invalidate or rename. If however, the parameter is changed, couple of
+     * is going be refresh or rename. If however, the parameter is changed, couple of
      * things could happen. The flag changes from unset/false to true or it changes from
      * true to false/unset. In the first case, we want to process the event (and ignore
      * subsequent events on this table). In the second case, we should process the event
@@ -1097,6 +1043,12 @@
       }
     }
 
+    @Override
+    public SelfEventContext getSelfEventContext() {
+      throw new UnsupportedOperationException("self-event evaluation is not needed for "
+          + "this event type");
+    }
+
     /**
      * Process the drop table event type. If the table from the event doesn't exist in the
      * catalog, ignore the event. If the table exists in the catalog, compares the
@@ -1121,7 +1073,7 @@
       if (removedTable != null) {
         infoLog("Removed table {} ", getFullyQualifiedTblName());
       } else if (!tblWasFound.getRef()) {
-        debugLog("Table {} was not removed since it did not exist in catalog.",
+        debugLog("Table {} was not removed since it does not exist in catalog anymore.",
             tblName_);
       } else if (!tblMatched.getRef()) {
         infoLog(debugString("Table %s was not removed from "
@@ -1161,6 +1113,12 @@
       }
     }
 
+    @Override
+    public SelfEventContext getSelfEventContext() {
+      throw new UnsupportedOperationException("Self-event evaluation is unnecessary for"
+          + " this event type");
+    }
+
     /**
      * Processes the create database event by adding the Db object from the event if it
      * does not exist in the catalog already.
@@ -1245,21 +1203,8 @@
     }
 
     @Override
-    protected void initSelfEventIdentifiersFromEvent() {
-      versionNumberFromEvent_ = Long.parseLong(getStringProperty(
-          alteredDatabase_.getParameters(),
-          MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1"));
-      serviceIdFromEvent_ = getStringProperty(
-          alteredDatabase_.getParameters(),
-          MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
-      try {
-        pendingVersionNumbersFromCatalog_ =
-            catalog_.getInFlightVersionsForEvents(dbName_, tblName_);
-      } catch (DatabaseNotFoundException | TableNotFoundException e) {
-        // ok to ignore this exception, since if the db doesn't exit, this event needs
-        // to be ignored anyways
-        debugLog("Received exception {}. Ignoring self-event evaluation", e.getMessage());
-      }
+    protected SelfEventContext getSelfEventContext() {
+      return new SelfEventContext(dbName_, null, alteredDatabase_.getParameters());
     }
   }
 
@@ -1294,6 +1239,12 @@
       }
     }
 
+    @Override
+    public SelfEventContext getSelfEventContext() {
+      throw new UnsupportedOperationException("Self-event evaluation is not needed for "
+          + "this event");
+    }
+
     /**
      * Process the drop database event. This handler removes the db object from catalog
      * only if the CREATION_TIME of the catalog's database object is lesser than or equal
@@ -1332,91 +1283,36 @@
   }
 
   /**
-   * MetastoreEvent for which issues invalidate on a table from the event
+   * Returns a list of parameters that are set by Hive for tables/partitions that can be
+   * ignored to determine if the alter table/partition event is a trivial one.
    */
-  public static abstract class TableInvalidatingEvent extends MetastoreTableEvent {
-    /**
-     * Prevent instantiation from outside should use MetastoreEventFactory instead
-     */
-    private TableInvalidatingEvent(
-        CatalogServiceCatalog catalog, Metrics metrics, NotificationEvent event) {
-      super(catalog, metrics, event);
-    }
+  @VisibleForTesting
+  static final List<String> parametersToIgnore =
+      new ImmutableList.Builder<String>()
+      .add("transient_lastDdlTime")
+      .add("totalSize")
+      .add("numFilesErasureCoded")
+      .add("numFiles")
+      .build();
 
-    /**
-     * Issues a invalidate table on the catalog on the table from the event. This
-     * invalidate does not fetch information from metastore unlike the invalidate metadata
-     * command since event is triggered post-metastore activity. This handler invalidates
-     * by atomically removing existing loaded table and replacing it with a
-     * IncompleteTable. If the table doesn't exist in catalog this operation is a no-op
-     */
-    @Override
-    public void process() throws MetastoreNotificationException, CatalogException {
-      // skip event processing in case its a self-event
-      if (isSelfEvent()) {
-        infoLog("Not processing the event as it is a self-event");
-        return;
-      }
-
-      // Skip if it's only a change in parameters by Hive, which can be ignored.
-      if (canBeSkipped()) {
-        infoLog("Not processing this event as it only modifies some "
-            + "parameters which can be ignored.");
-        return;
-      }
-
-      if (invalidateCatalogTable()) {
-        infoLog("Table {} is invalidated", getFullyQualifiedTblName());
+  /**
+   * Util method that sets the parameters that can be ignored equal before and after
+   * event.
+   */
+  private static void setTrivialParameters(Map<String, String> parametersBefore,
+      Map<String, String> parametersAfter) {
+    for (String parameter: parametersToIgnore) {
+      String val = parametersBefore.get(parameter);
+      if (val == null) {
+        parametersAfter.remove(parameter);
       } else {
-        debugLog("Table {} does not need to be invalidated since "
-            + "it does not exist anymore", getFullyQualifiedTblName());
+        parametersAfter.put(parameter, val);
       }
     }
-
-    protected static String getStringProperty(
-        Map<String, String> params, String key, String defaultVal) {
-      if (params == null) return defaultVal;
-      return params.getOrDefault(key, defaultVal);
-    }
-
-    /**
-     * Returns a list of parameters that are set by Hive for tables/partitions that can be
-     * ignored to determine if the alter table/partition event is a trivial one.
-     */
-    static final List<String> parametersToIgnore = new ImmutableList.Builder<String>()
-        .add("transient_lastDdlTime")
-        .add("totalSize")
-        .add("numFilesErasureCoded")
-        .add("numFiles")
-        .build();
-
-    /**
-     * Util method that sets the parameters that can be ignored equal before and after
-     * event.
-     */
-     static void setTrivialParameters(Map<String, String> parametersBefore,
-        Map<String, String> parametersAfter) {
-      for (String parameter: parametersToIgnore) {
-        String val = parametersBefore.get(parameter);
-        if (val == null) {
-          parametersAfter.remove(parameter);
-        } else {
-          parametersAfter.put(parameter, val);
-        }
-      }
-    }
-
-    /**
-     * Hive generates certain trivial alter events for eg: change only
-     * "transient_lastDdlTime". This method returns true if the alter partition event is
-     * such a trivial event.
-     */
-    protected abstract boolean canBeSkipped();
   }
-
-  public static class AddPartitionEvent extends TableInvalidatingEvent {
-    private Partition lastAddedPartition_;
+  public static class AddPartitionEvent extends MetastoreTableEvent {
     private final List<Partition> addedPartitions_;
+    private final List<List<TPartitionKeyValue>> partitionKeyVals_;
 
     /**
      * Prevent instantiation from outside should use MetastoreEventFactory instead
@@ -1436,21 +1332,30 @@
             Lists.newArrayList(addPartitionMessage_.getPartitionObjs());
         // it is possible that the added partitions is empty in certain cases. See
         // IMPALA-8847 for example
-        if (!addedPartitions_.isEmpty()) {
-          // when multiple partitions are added in HMS they are all added as one
-          // transaction Hence all the partitions which are present in the message must
-          // have the same serviceId and version if it is set. hence it is fine to just
-          // look at the last added partition in the list and use it for the self-event
-          // ids
-          lastAddedPartition_ = addedPartitions_.get(addedPartitions_.size() - 1);
-        }
         msTbl_ = addPartitionMessage_.getTableObj();
+        partitionKeyVals_ = new ArrayList<>(addedPartitions_.size());
+        for (Partition part : addedPartitions_) {
+          partitionKeyVals_.add(getTPartitionSpecFromHmsPartition(msTbl_, part));
+        }
       } catch (Exception ex) {
         throw new MetastoreNotificationException(ex);
       }
     }
 
     @Override
+    public SelfEventContext getSelfEventContext() {
+      Map<String, String> params = new HashMap<>();
+      // all the partitions are added as one transaction and hence we expect all the
+      // added partitions to have the same catalog service identifiers. Using the first
+      // one for the params is enough for the purpose of self-event evaluation
+      if (!addedPartitions_.isEmpty()) {
+        params.putAll(addedPartitions_.get(0).getParameters());
+      }
+      return new SelfEventContext(dbName_, tblName_, partitionKeyVals_,
+          params);
+    }
+
+    @Override
     public void process() throws MetastoreNotificationException, CatalogException {
       // bail out early if there are not partitions to process
       if (addedPartitions_.isEmpty()) {
@@ -1464,9 +1369,8 @@
       try {
         // Reload the whole table if it's a transactional table.
         if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
-          reloadTableFromCatalog("ADD_PARTITION");
+          reloadTableFromCatalog("ADD_PARTITION", true);
         } else {
-          boolean success = true;
           // HMS adds partitions in a transactional way. This means there may be multiple
           // HMS partition objects in an add_partition event. We try to do the same here
           // by refreshing all those partitions in a loop. If any partition refresh fails,
@@ -1474,58 +1378,23 @@
           // refresh of the partitions if the table is not present in the catalog.
           infoLog("Trying to refresh {} partitions added to table {} in the event",
               addedPartitions_.size(), getFullyQualifiedTblName());
+          //TODO refresh all the partition together instead of looping one by one
           for (Partition partition : addedPartitions_) {
             List<TPartitionKeyValue> tPartSpec =
                 getTPartitionSpecFromHmsPartition(msTbl_, partition);
-            if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
-                "processing ADD_PARTITION event from HMS")) {
-              debugLog("Refresh partitions on table {} failed as table was not present " +
-                  "in the catalog.", getFullyQualifiedTblName());
-              success = false;
-              break;
-            }
-          }
-          if (success) {
-            infoLog("Refreshed {} partitions of table {}", addedPartitions_.size(),
-                getFullyQualifiedTblName());
+            if (!reloadPartition(tPartSpec, "ADD_PARTITION")) break;
           }
         }
-      } catch (DatabaseNotFoundException e) {
-        debugLog("Refresh partitions on table {} after add_partitions event failed as "
-                + "the database was not present in the catalog.",
-            getFullyQualifiedTblName());
       } catch (CatalogException e) {
         throw new MetastoreNotificationNeedsInvalidateException(debugString("Failed to "
                 + "refresh newly added partitions of table {}. Event processing cannot "
-                + "continue. Issue an invalidate command to reset event processor.",
-            getFullyQualifiedTblName()), e);
+                + "continue. Issue an invalidate metadata command to reset event "
+                + "processor.", getFullyQualifiedTblName()), e);
       }
     }
-
-    @Override
-    protected void initSelfEventIdentifiersFromEvent() {
-      versionNumberFromEvent_ = Long.parseLong(getStringProperty(
-          lastAddedPartition_.getParameters(),
-          MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1"));
-      serviceIdFromEvent_ = getStringProperty(
-          lastAddedPartition_.getParameters(),
-          MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
-      try {
-        pendingVersionNumbersFromCatalog_ =
-            catalog_.getInFlightVersionsForEvents(dbName_, tblName_);
-      } catch (DatabaseNotFoundException | TableNotFoundException e) {
-        // ok to ignore this exception, since if the db doesn't exit, this event needs
-        // to be ignored anyways
-        debugLog("Received exception {}. Ignoring self-event evaluation",
-            e.getMessage());
-      }
-    }
-
-    @Override
-    protected boolean canBeSkipped() { return false; }
   }
 
-  public static class AlterPartitionEvent extends TableInvalidatingEvent {
+  public static class AlterPartitionEvent extends MetastoreTableEvent {
     // the Partition object before alter operation, as parsed from the NotificationEvent
     private final org.apache.hadoop.hive.metastore.api.Partition partitionBefore_;
     // the Partition object after alter operation, as parsed from the NotificationEvent
@@ -1572,42 +1441,25 @@
 
       // Reload the whole table if it's a transactional table.
       if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
-        reloadTableFromCatalog("ALTER_PARTITION");
+        reloadTableFromCatalog("ALTER_PARTITION", true);
       } else {
         // Refresh the partition that was altered.
         Preconditions.checkNotNull(partitionAfter_);
         List<TPartitionKeyValue> tPartSpec = getTPartitionSpecFromHmsPartition(msTbl_,
             partitionAfter_);
         try {
-          // Ignore event if table or database is not in catalog. Throw exception if
-          // refresh fails.
-          if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
-              "processing ALTER_PARTITION event from HMS")) {
-            debugLog("Refresh of table {} partition {} failed as the table "
-                    + "is not present in the catalog.", getFullyQualifiedTblName(),
-                constructPartitionStringFromTPartitionSpec(tPartSpec));
-          } else {
-            infoLog("Table {} partition {} has been refreshed",
-                getFullyQualifiedTblName(),
-                constructPartitionStringFromTPartitionSpec(tPartSpec));
-          }
-        } catch (DatabaseNotFoundException e) {
-          debugLog("Refresh of table {} partition {} "
-                  + "event failed as the database is not present in the catalog.",
-              getFullyQualifiedTblName(),
-              constructPartitionStringFromTPartitionSpec(tPartSpec));
+          reloadPartition(tPartSpec, "ALTER_PARTITION");
         } catch (CatalogException e) {
           throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
                   + "partition on table {} partition {} failed. Event processing cannot "
-                  + "continue. Issue and invalidate command to reset the event processor "
+                  + "continue. Issue an invalidate command to reset the event processor "
                   + "state.", getFullyQualifiedTblName(),
-              constructPartitionStringFromTPartitionSpec(tPartSpec)), e);
+              HdfsTable.constructPartitionName(tPartSpec)), e);
         }
       }
     }
 
-    @Override
-    protected boolean canBeSkipped() {
+    private boolean canBeSkipped() {
       // Certain alter events just modify some parameters such as
       // "transient_lastDdlTime" in Hive. For eg: the alter table event generated
       // along with insert events. Check if the alter table event is such a trivial
@@ -1622,26 +1474,14 @@
     }
 
     @Override
-    protected void initSelfEventIdentifiersFromEvent() {
-      versionNumberFromEvent_ = Long.parseLong(getStringProperty(
-          partitionAfter_.getParameters(),
-          MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1"));
-      serviceIdFromEvent_ = getStringProperty(
-          partitionAfter_.getParameters(),
-          MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
-      try {
-        pendingVersionNumbersFromCatalog_ =
-            catalog_.getInFlightVersionsForEvents(dbName_, tblName_);
-      } catch (DatabaseNotFoundException | TableNotFoundException e) {
-        // ok to ignore since if the db or table doesn't exist the event is ignored
-        // anyways
-        debugLog("Received exception {}. Ignoring self-event evaluation",
-            e.getMessage());
-      }
+    public SelfEventContext getSelfEventContext() {
+      return new SelfEventContext(dbName_, tblName_,
+          Arrays.asList(getTPartitionSpecFromHmsPartition(msTbl_, partitionAfter_)),
+          partitionAfter_.getParameters());
     }
   }
 
-  public static class DropPartitionEvent extends TableInvalidatingEvent {
+  public static class DropPartitionEvent extends MetastoreTableEvent {
     private final List<Map<String, String>> droppedPartitions_;
 
     /**
@@ -1669,62 +1509,44 @@
     }
 
     @Override
-    public void process() throws MetastoreNotificationException {
+    public void process() throws MetastoreNotificationException, CatalogException {
       // we have seen cases where a add_partition event is generated with empty
       // partition list (see IMPALA-8547 for details. Make sure that droppedPartitions
       // list is not empty
       if (droppedPartitions_.isEmpty()) {
         infoLog("Partition list is empty. Ignoring this event.");
       }
-      // We do not need self event as dropPartition() call is a no-op if the directory
-      // doesn't exist.
       try {
         // Reload the whole table if it's a transactional table.
         if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
-          reloadTableFromCatalog("DROP_PARTITION");
+          reloadTableFromCatalog("DROP_PARTITION", true);
         } else {
-          boolean success = true;
           // We refresh all the partitions that were dropped from HMS. If a refresh
           // fails, we throw a MetastoreNotificationNeedsInvalidateException
-          infoLog("{} partitions dropped from table {}. Trying "
-              + "to refresh.", droppedPartitions_.size(), getFullyQualifiedTblName());
+          infoLog("{} partitions dropped from table {}. Refreshing the partitions "
+                  + "to remove them from catalog.", droppedPartitions_.size(),
+              getFullyQualifiedTblName());
           for (Map<String, String> partSpec : droppedPartitions_) {
             List<TPartitionKeyValue> tPartSpec = new ArrayList<>(partSpec.size());
             for (Map.Entry<String, String> entry : partSpec.entrySet()) {
               tPartSpec.add(new TPartitionKeyValue(entry.getKey(), entry.getValue()));
             }
-            if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
-                "processing DROP_PARTITION event from HMS")) {
-              debugLog("Could not refresh partition {} of table {} as table "
-                      + "was not present in the catalog.",
-                      getFullyQualifiedTblName());
-              success = false;
-              break;
-            }
-          }
-          if (success) {
-            infoLog("Refreshed {} partitions of table {}", droppedPartitions_.size(),
-                getFullyQualifiedTblName());
+            if (!reloadPartition(tPartSpec, "DROP_PARTITION")) break;
           }
         }
-      } catch (DatabaseNotFoundException e) {
-        debugLog("Could not refresh partitions of table {}"
-            + "as database was not present in the catalog.", getFullyQualifiedTblName());
       } catch (CatalogException e) {
         throw new MetastoreNotificationNeedsInvalidateException(debugString("Failed to "
             + "drop some partitions from table {} after a drop partitions event. Event "
-                + "processing cannot continue. Issue an invalidate command to reset "
-            + "event processor state.", getFullyQualifiedTblName()), e);
+            + "processing cannot continue. Issue an invalidate metadata command to "
+            + "reset event processor state.", getFullyQualifiedTblName()), e);
       }
     }
 
     @Override
-    protected void initSelfEventIdentifiersFromEvent() {
-      // no-op
+    protected SelfEventContext getSelfEventContext() {
+      throw new UnsupportedOperationException("self-event evaluation is not needed for "
+          + "this event type");
     }
-
-    @Override
-    protected boolean canBeSkipped() { return false; }
   }
 
   /**
@@ -1742,12 +1564,19 @@
 
     @Override
     public void process() {
-      debugLog("Ignored");
+      debugLog(
+          "Ignoring unknown event type " + metastoreNotificationEvent_.getEventType());
     }
 
     @Override
     protected boolean isEventProcessingDisabled() {
       return false;
     }
+
+    @Override
+    protected SelfEventContext getSelfEventContext() {
+      throw new UnsupportedOperationException("Self-event evaluation is not needed for "
+          + "this event type");
+    }
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index ac50562..8b9bba3 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -204,8 +204,10 @@
   public static final String LAST_SYNCED_ID_METRIC = "last-synced-event-id";
   // metric name which counts the number of self-events which are skipped
   public static final String NUMBER_OF_SELF_EVENTS = "self-events-skipped";
-  // metric name for number of tables which are invalidated by event processor so far
-  public static final String NUMBER_OF_TABLE_INVALIDATES = "tables-invalidated";
+  // metric name for number of tables which are refreshed by event processor so far
+  public static final String NUMBER_OF_TABLE_REFRESHES = "tables-refreshed";
+  // number of times events processor refreshed a partition
+  public static final String NUMBER_OF_PARTITION_REFRESHES = "partitions-refreshed";
 
   // possible status of event processor
   public enum EventProcessorStatus {
@@ -311,7 +313,8 @@
     metrics_.addGauge(LAST_SYNCED_ID_METRIC,
         (Gauge<Long>) () -> lastSyncedEventId_.get());
     metrics_.addCounter(NUMBER_OF_SELF_EVENTS);
-    metrics_.addCounter(NUMBER_OF_TABLE_INVALIDATES);
+    metrics_.addCounter(NUMBER_OF_TABLE_REFRESHES);
+    metrics_.addCounter(NUMBER_OF_PARTITION_REFRESHES);
   }
 
   /**
@@ -334,8 +337,7 @@
   /**
    * Gets the current event processor status
    */
-  @VisibleForTesting
-  EventProcessorStatus getStatus() {
+  public EventProcessorStatus getStatus() {
     return eventProcessorStatus_;
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java b/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java
new file mode 100644
index 0000000..83aa69b
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.events;
+
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
+import org.apache.impala.thrift.TPartitionKeyValue;
+
+/**
+ * Helper class which encapsulates all the information needed to evaluate if given
+ * event is self-event or not
+ */
+public class SelfEventContext {
+  private final String dbName_;
+  private final String tblName_;
+  // version number from the event object parameters used for self-event detection
+  private final long versionNumberFromEvent_;
+  // service id from the event object parameters used for self-event detection
+  private final String serviceIdFromEvent_;
+  private final List<List<TPartitionKeyValue>> partitionKeyValues_;
+
+  SelfEventContext(String dbName, String tblName,
+      Map<String, String> parameters) {
+    this(dbName, tblName, null, parameters);
+  }
+
+  /**
+   * Creates a self-event context for self-event evaluation for database, table or
+   * partition events.
+   *
+   * @param dbName Database name
+   * @param tblName Table name
+   * @param partitionKeyValues Partition key-values in case of self-event
+   * context is for partition.
+   * @param parameters this could be database, table or partition parameters.
+   */
+  SelfEventContext(String dbName, @Nullable String tblName,
+      @Nullable List<List<TPartitionKeyValue>> partitionKeyValues,
+      Map<String, String> parameters) {
+    Preconditions.checkNotNull(parameters);
+    this.dbName_ = Preconditions.checkNotNull(dbName);
+    this.tblName_ = tblName;
+    this.partitionKeyValues_ = partitionKeyValues;
+    versionNumberFromEvent_ = Long.parseLong(
+        MetastoreEvents.getStringProperty(parameters,
+            MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1"));
+    serviceIdFromEvent_ =
+        MetastoreEvents.getStringProperty(parameters,
+            MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
+  }
+
+  public String getDbName() {
+    return dbName_;
+  }
+
+  public String getTblName() {
+    return tblName_;
+  }
+
+  public long getVersionNumberFromEvent() {
+    return versionNumberFromEvent_;
+  }
+
+  public String getServiceIdFromEvent() {
+    return serviceIdFromEvent_;
+  }
+
+  public List<List<TPartitionKeyValue>> getPartitionKeyValues() {
+    return partitionKeyValues_ == null ?
+     null : Collections.unmodifiableList(partitionKeyValues_);
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 14b67df..8c52ea8 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -583,7 +583,7 @@
         try {
           alterTableOrViewRename(tbl,
               TableName.fromThrift(params.getRename_params().getNew_table_name()),
-              response);
+              newCatalogVersion, response);
           return;
         } finally {
           // release the version taken in the tryLock call above
@@ -624,6 +624,12 @@
           refreshedTable = alterTableAddPartitions(tbl, params.getAdd_partition_params());
           if (refreshedTable != null) {
             refreshedTable.setCatalogVersion(newCatalogVersion);
+            // the alter table event is only generated when we add the partition. For
+            // instance if not exists clause is provided and the partition is
+            // pre-existing there is no alter table event generated. Hence we should
+            // only add the versions for in-flight events when we are sure that the
+            // partition was really added.
+            catalog_.addVersionsForInflightEvents(tbl, newCatalogVersion);
             addTableToCatalogUpdate(refreshedTable, response.result);
           }
           reloadMetadata = false;
@@ -655,6 +661,10 @@
               dropPartParams.isPurge(), numUpdatedPartitions);
           if (refreshedTable != null) {
             refreshedTable.setCatalogVersion(newCatalogVersion);
+            // we don't need to add catalog versions in partition's InflightEvents here
+            // since by the time the event is received, the partition is already
+            // removed from catalog and there is nothing to compare against during
+            // self-event evaluation
             addTableToCatalogUpdate(refreshedTable, response.result);
           }
           addSummary(response,
@@ -757,11 +767,11 @@
       if (reloadMetadata) {
         loadTableMetadata(tbl, newCatalogVersion, reloadFileMetadata,
             reloadTableSchema, null, "ALTER TABLE " + params.getAlter_type().name());
+        // now that HMS alter operation has succeeded, add this version to list of
+        // inflight events in catalog table if event processing is enabled
+        catalog_.addVersionsForInflightEvents(tbl, newCatalogVersion);
         addTableToCatalogUpdate(tbl, response.result);
       }
-      // now that HMS alter operation has succeeded, add this version to list of inflight
-      // events in catalog table if event processing is enabled
-      catalog_.addVersionsForInflightEvents(tbl, newCatalogVersion);
     } finally {
       context.stop();
       UnlockWriteLockIfErronouslyLocked();
@@ -874,7 +884,7 @@
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       List<HdfsPartition> hdfsPartitions = hdfsTable.createAndLoadPartitions(
           msClient.getHiveClient(), partitions);
-      for (HdfsPartition hdfsPartition: hdfsPartitions) {
+      for (HdfsPartition hdfsPartition : hdfsPartitions) {
         catalog_.addPartition(hdfsPartition);
       }
     }
@@ -938,7 +948,7 @@
    */
   private void addCatalogServiceIdentifiers(Table tbl, String catalogServiceId,
       long newCatalogVersion) {
-    if (!catalog_.isExternalEventProcessingEnabled()) return;
+    if (!catalog_.isEventProcessingActive()) return;
     org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable();
     msTbl.putToParameters(
         MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
@@ -2938,7 +2948,7 @@
    * reloaded on the next access.
    */
   private void alterTableOrViewRename(Table oldTbl, TableName newTableName,
-      TDdlExecResponse response) throws ImpalaException {
+      long newCatalogVersion, TDdlExecResponse response) throws ImpalaException {
     Preconditions.checkState(oldTbl.getLock().isHeldByCurrentThread()
         && catalog_.getLock().isWriteLockedByCurrentThread());
     TableName tableName = oldTbl.getTableName();
@@ -2985,7 +2995,7 @@
           "'%s' and the new table name '%s' may fix the problem." , tableName.toString(),
           newTableName.toString()));
     }
-
+    catalog_.addVersionsForInflightEvents(result.second, newCatalogVersion);
     // TODO(todd): if client is a 'v2' impalad, only send back invalidation
     response.result.addToRemoved_catalog_objects(result.first.toMinimalTCatalogObject());
     response.result.addToUpdated_catalog_objects(result.second.toTCatalogObject());
@@ -3515,14 +3525,14 @@
   }
 
   /**
-   * Adds this catalog service id and the given catalog version to the partition
-   * parameters from table parameters. No-op if event processing is disabled
+   * No-op if event processing is disabled. Adds this catalog service id and the given
+   * catalog version to the partition parameters from table parameters.
    */
   private void addCatalogServiceIdentifiers(
       org.apache.hadoop.hive.metastore.api.Table msTbl, Partition partition) {
-    if (!catalog_.isExternalEventProcessingEnabled())
-      return;
+    if (!catalog_.isEventProcessingActive()) return;
     Preconditions.checkState(msTbl.isSetParameters());
+    Preconditions.checkNotNull(partition, "Partition is null");
     Map<String, String> tblParams = msTbl.getParameters();
     Preconditions
         .checkState(tblParams.containsKey(
@@ -3534,12 +3544,39 @@
             MetastoreEventPropertyKey.CATALOG_VERSION.getKey()),
             "Table parameters must contain catalog version before adding "
                 + "it to partition parameters");
-    partition.putToParameters(
-        MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
-        tblParams.get(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey()));
-    partition.putToParameters(
-        MetastoreEventPropertyKey.CATALOG_VERSION.getKey(),
-        tblParams.get(MetastoreEventPropertyKey.CATALOG_VERSION.getKey()));
+    // make sure that the service id from the table matches with our own service id to
+    // avoid issues where the msTbl has an older (other catalogs' service identifiers)
+    String serviceIdFromTbl =
+        tblParams.get(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey());
+    String version = tblParams.get(MetastoreEventPropertyKey.CATALOG_VERSION.getKey());
+    if (catalog_.getCatalogServiceId().equals(serviceIdFromTbl)) {
+      partition.putToParameters(
+          MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), serviceIdFromTbl);
+      partition.putToParameters(
+          MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), version);
+    }
+  }
+
+  /**
+   * This method extracts the catalog version from the tbl parameters and adds it to
+   * the HdfsPartition's inflight events. This information is used by event
+   * processor to skip the event generated on the partition.
+   */
+  private void addToInflightVersionsOfPartition(
+      Map<String, String> partitionParams, HdfsPartition hdfsPartition) {
+    if (!catalog_.isEventProcessingActive()) return;
+    Preconditions.checkState(partitionParams != null);
+    String version = partitionParams
+        .get(MetastoreEventPropertyKey.CATALOG_VERSION.getKey());
+    String serviceId = partitionParams
+        .get(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey());
+
+    // make sure that we are adding the catalog version from our own instance of
+    // catalog service identifiers
+    if (catalog_.getCatalogServiceId().equals(serviceId)) {
+      Preconditions.checkNotNull(version);
+      hdfsPartition.addToVersionsForInflightEvents(Long.parseLong(version));
+    }
   }
 
   /**
@@ -3660,6 +3697,7 @@
       addCatalogServiceIdentifiers(tbl.getMetaStoreTable(), hmsPartition);
       applyAlterHmsPartitions(tbl.getMetaStoreTable().deepCopy(), msClient,
           tbl.getTableName(), Arrays.asList(hmsPartition));
+      addToInflightVersionsOfPartition(hmsPartition.getParameters(), partition);
     }
   }
 
@@ -3829,7 +3867,13 @@
           for (org.apache.hadoop.hive.metastore.api.Partition msPartition:
               hmsPartitionsSubList) {
             try {
-              catalog_.getHdfsPartition(dbName, tableName, msPartition).markDirty();
+              HdfsPartition hdfsPartition = catalog_.getHdfsPartition(dbName, tableName,
+                  msPartition);
+              hdfsPartition.markDirty();
+              // if event processing is turned on add the version number from partition
+              // paramters to the HdfsPartition's list of in-flight events
+              addToInflightVersionsOfPartition(msPartition.getParameters(),
+                  hdfsPartition);
             } catch (PartitionNotFoundException e) {
               LOG.error(String.format("Partition of table %s could not be found: %s",
                   tableName, e.getMessage()));
@@ -3963,8 +4007,11 @@
                 boolean isTransactional = AcidUtils.isTransactionalTable(
                     tbl.getMetaStoreTable().getParameters());
                 Preconditions.checkArgument(!isTransactional);
+                Reference<Boolean> wasPartitionRefreshed = new Reference<>(false);
+                // TODO if the partition was not really refreshed because the partSpec
+                // was wrong, do we still need to send back the table?
                 updatedThriftTable = catalog_.reloadPartition(tbl,
-                    req.getPartition_spec(), cmdString);
+                    req.getPartition_spec(), wasPartitionRefreshed, cmdString);
               } else {
                 // TODO IMPALA-8809: Optimisation for partitioned tables:
                 //   1: Reload the whole table if schema change happened. Identify
@@ -4097,6 +4144,8 @@
       List<FeFsPartition> affectedExistingPartitions = new ArrayList<>();
       List<org.apache.hadoop.hive.metastore.api.Partition> hmsPartitionsStatsUnset =
           Lists.newArrayList();
+      addCatalogServiceIdentifiers(table, catalog_.getCatalogServiceId(),
+          newCatalogVersion);
       if (table.getNumClusteringCols() > 0) {
         // Set of all partition names targeted by the insert that need to be created
         // in the Metastore (partitions that do not currently exist in the catalog).
@@ -4157,6 +4206,7 @@
               partition.getSd().setSerdeInfo(msTbl.getSd().getSerdeInfo().deepCopy());
               partition.getSd().setLocation(msTbl.getSd().getLocation() + "/" +
                   partName.substring(0, partName.length() - 1));
+              addCatalogServiceIdentifiers(msTbl, partition);
               MetastoreShim.updatePartitionStatsFast(partition, msTbl, warehouse);
             }
 
@@ -4287,7 +4337,7 @@
    */
   private void createInsertEvents(Table table,
       List<FeFsPartition> affectedExistingPartitions, boolean isInsertOverwrite) {
-    if (!catalog_.isExternalEventProcessingEnabled() ||
+    if (!catalog_.isEventProcessingActive() ||
         affectedExistingPartitions.size() == 0) return;
 
     // Map of partition names to file names of all existing partitions touched by the
@@ -4429,6 +4479,9 @@
       throw new CatalogException("Database: " + dbName + " does not exist.");
     }
     synchronized (metastoreDdlLock_) {
+      // Get a new catalog version to assign to the database being altered.
+      long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
+      addCatalogServiceIdentifiers(db, catalog_.getCatalogServiceId(), newCatalogVersion);
       Database msDb = db.getMetaStoreDb().deepCopy();
       msDb.setDescription(comment);
       try {
@@ -4438,6 +4491,9 @@
       }
       Db updatedDb = catalog_.updateDb(msDb);
       addDbToCatalogUpdate(updatedDb, response.result);
+      // now that HMS alter operation has succeeded, add this version to list of inflight
+      // events in catalog database if event processing is enabled
+      catalog_.addVersionsForInflightEvents(db, newCatalogVersion);
     }
     addSummary(response, "Updated database.");
   }
@@ -4498,7 +4554,7 @@
    */
   private void addCatalogServiceIdentifiers(
       Db db, String catalogServiceId, long newCatalogVersion) {
-    if (!catalog_.isExternalEventProcessingEnabled()) return;
+    if (!catalog_.isEventProcessingActive()) return;
     org.apache.hadoop.hive.metastore.api.Database msDb = db.getMetaStoreDb();
     msDb.putToParameters(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
         catalogServiceId);
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
index e486111..3f44312 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
@@ -395,7 +395,7 @@
     List<TPartitionKeyValue> partitionSpec = ImmutableList.of(
         new TPartitionKeyValue("year", "2010"),
         new TPartitionKeyValue("month", "10"));
-    catalog_.reloadPartition(table, partitionSpec, "test");
+    catalog_.reloadPartition(table, partitionSpec, new Reference<>(false), "test");
     assertEquals(0L, (long)opsCounts.getLong(GET_FILE_BLOCK_LOCS));
 
     // Loading or reloading an unpartitioned table with some files in it should not make
@@ -416,7 +416,7 @@
         .getPartitionFromThriftPartitionSpec(partitionSpec);
     hdfsPartition.setFileDescriptors(new ArrayList<>());
     stats.reset();
-    catalog_.reloadPartition(table, partitionSpec, "test");
+    catalog_.reloadPartition(table, partitionSpec, new Reference<>(false), "test");
 
     // Should not scan the directory file-by-file, should use a single
     // listLocatedStatus() to get the whole directory (partition)
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 801358b..6b4dbb9 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -82,7 +82,6 @@
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType;
-import org.apache.impala.catalog.events.MetastoreEvents.TableInvalidatingEvent;
 import org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
@@ -107,6 +106,7 @@
 import org.apache.impala.thrift.TAlterTableSetRowFormatParams;
 import org.apache.impala.thrift.TAlterTableSetTblPropertiesParams;
 import org.apache.impala.thrift.TAlterTableType;
+import org.apache.impala.thrift.TAlterTableUpdateStatsParams;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TColumnType;
 import org.apache.impala.thrift.TCreateDbParams;
@@ -124,11 +124,13 @@
 import org.apache.impala.thrift.TOwnerType;
 import org.apache.impala.thrift.TPartitionDef;
 import org.apache.impala.thrift.TPartitionKeyValue;
+import org.apache.impala.thrift.TPartitionStats;
 import org.apache.impala.thrift.TPrimitiveType;
 import org.apache.impala.thrift.TScalarType;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TTablePropertyType;
 import org.apache.impala.thrift.TTableRowFormat;
+import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.thrift.TTypeNode;
 import org.apache.impala.thrift.TTypeNodeType;
 import org.apache.impala.thrift.TUniqueId;
@@ -683,7 +685,7 @@
             catalog_.getTable(TEST_DB_NAME, testTblName));
     FeFsPartition singlePartitionAfterTrivialAlter =
         Iterables.getOnlyElement(partsAfterTrivialAlter);
-    for (String parameter : TableInvalidatingEvent.parametersToIgnore) {
+    for (String parameter : MetastoreEvents.parametersToIgnore) {
       assertEquals("Unexpected parameter value after trivial alter partition "
           + "event", singlePartition.getParameters().get(parameter),
           singlePartitionAfterTrivialAlter.getParameters().get(parameter));
@@ -736,11 +738,6 @@
 
   /**
    * Util method to create empty files in a given path
-   * @param parentPath
-   * @param fileNamePrefix
-   * @param totalNumberOfFilesToAdd
-   * @return
-   * @throws IOException
    */
   private List<String> addFilesToDirectory(Path parentPath, String fileNamePrefix,
       int totalNumberOfFilesToAdd, boolean isOverwrite) throws IOException {
@@ -858,8 +855,8 @@
     eventsProcessor_.processEvents();
     // simulate the table being loaded by explicitly calling load table
     loadTable("old_name");
-    long numberOfInvalidatesBefore = eventsProcessor_.getMetrics()
-        .getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLE_INVALIDATES).getCount();
+    long numOfRefreshesBefore = eventsProcessor_.getMetrics()
+        .getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLE_REFRESHES).getCount();
 
     // test renaming a table from outside aka metastore client
     alterTableRename("old_name", testTblName, null);
@@ -895,43 +892,43 @@
     loadTable(testTblName);
     alterTableAddParameter(testTblName, "somekey", "someval");
     eventsProcessor_.processEvents();
-    assertTrue("Table should be incomplete after alter table add parameter",
+    assertFalse("Table should have been refreshed after alter table add parameter",
         catalog_.getTable(TEST_DB_NAME, testTblName)
                 instanceof IncompleteTable);
     // check invalidate after alter table add col
     loadTable(testTblName);
     alterTableAddCol(testTblName, "newCol", "int", "null");
     eventsProcessor_.processEvents();
-    assertTrue("Table should have been invalidated after alter table add column",
+    assertFalse("Table should have been refreshed after alter table add column",
         catalog_.getTable(TEST_DB_NAME, testTblName)
                 instanceof IncompleteTable);
     // check invalidate after alter table change column type
     loadTable(testTblName);
     altertableChangeCol(testTblName, "newCol", "string", null);
     eventsProcessor_.processEvents();
-    assertTrue("Table should have been invalidated after changing column type",
+    assertFalse("Table should have been refreshed after changing column type",
         catalog_.getTable(TEST_DB_NAME, testTblName)
                 instanceof IncompleteTable);
     // check invalidate after alter table remove column
     loadTable(testTblName);
     alterTableRemoveCol(testTblName, "newCol");
     eventsProcessor_.processEvents();
-    assertTrue("Table should have been invalidated after removing a column",
+    assertFalse("Table should have been refreshed after removing a column",
         catalog_.getTable(TEST_DB_NAME, testTblName)
                 instanceof IncompleteTable);
     // 5 alters above. Each one of them except rename should increment the counter by 1
     long numberOfInvalidatesAfter = eventsProcessor_.getMetrics()
-        .getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLE_INVALIDATES).getCount();
-    assertEquals("Unexpected number of table invalidates",
-        numberOfInvalidatesBefore + 4, numberOfInvalidatesAfter);
+        .getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLE_REFRESHES).getCount();
+    assertEquals("Unexpected number of table refreshes",
+        numOfRefreshesBefore + 4, numberOfInvalidatesAfter);
     // Check if trivial alters are ignored.
     loadTable(testTblName);
     alterTableChangeTrivialProperties(testTblName);
-    // The above alter should not cause an invalidate.
+    // The above alter should not cause a refresh.
     long numberOfInvalidatesAfterTrivialAlter = eventsProcessor_.getMetrics()
-        .getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLE_INVALIDATES).getCount();
-    assertEquals("Unexpected number of table invalidates after trivial alter",
-        numberOfInvalidatesBefore + 4, numberOfInvalidatesAfterTrivialAlter);
+        .getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLE_REFRESHES).getCount();
+    assertEquals("Unexpected number of table refreshes after trivial alters",
+        numOfRefreshesBefore + 4, numberOfInvalidatesAfterTrivialAlter);
 
     // Simulate rename and drop sequence for table/db.
     String tblName = "alter_drop_test";
@@ -2042,6 +2039,11 @@
 
   private abstract class AlterTableExecutor {
     protected abstract void execute() throws Exception;
+
+    protected long getNumTblsRefreshed() {
+      return eventsProcessor_.getMetrics()
+          .getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLE_REFRESHES).getCount();
+    }
   }
 
   private class HiveAlterTableExecutor extends AlterTableExecutor {
@@ -2055,21 +2057,33 @@
     }
 
     public void execute() throws Exception {
+      Table tblBefore = Preconditions.checkNotNull(catalog_.getTable(TEST_DB_NAME,
+          tblName_));
+      boolean incompleteBefore = tblBefore instanceof IncompleteTable;
       if (toggle_.get()) {
         alterTableAddCol(tblName_, colName_, colType_, "");
-        verify();
       } else {
         alterTableRemoveCol(tblName_, colName_);
-        verify();
       }
+      verify(incompleteBefore);
       toggle_.compareAndSet(toggle_.get(), !toggle_.get());
     }
 
-    private void verify() throws Exception {
+    private void verify(boolean wasTblIncompleteBefore) throws Exception {
+      long numTblsRefreshedBefore = getNumTblsRefreshed();
       eventsProcessor_.processEvents();
       Table catTable = catalog_.getTable(TEST_DB_NAME, tblName_);
       assertNotNull(catTable);
-      assertTrue(catTable instanceof IncompleteTable);
+      if (wasTblIncompleteBefore) {
+        assertTrue("Table should not reloaded if its already incomplete",
+            catTable instanceof IncompleteTable);
+        assertTrue(numTblsRefreshedBefore == getNumTblsRefreshed());
+      } else {
+        assertFalse("Table should have been reloaded if its loaded before",
+            catTable instanceof IncompleteTable);
+        assertTrue(numTblsRefreshedBefore < getNumTblsRefreshed());
+      }
+
     }
   }
 
@@ -2087,19 +2101,21 @@
     public void execute() throws Exception {
       if (toggle_.get()) {
         alterTableAddColsFromImpala(TEST_DB_NAME, tblName_, colName_, colType_);
-        verify();
       } else {
         alterTableRemoveColFromImpala(TEST_DB_NAME, tblName_, colName_);
-        verify();
       }
+      verify();
       toggle_.compareAndSet(toggle_.get(), !toggle_.get());
     }
 
     public void verify() throws Exception {
+      long numTblsRefreshedBefore = getNumTblsRefreshed();
       eventsProcessor_.processEvents();
       Table catTable = catalog_.getTable(TEST_DB_NAME, tblName_);
       assertNotNull(catTable);
       assertFalse(catTable instanceof IncompleteTable);
+      // this is a self-event, table should not be refreshed
+      assertTrue(numTblsRefreshedBefore == getNumTblsRefreshed());
     }
   }
 
@@ -2182,13 +2198,41 @@
 
     eventsProcessor_.processEvents();
     Table catalogTbl = catalog_.getTable(TEST_DB_NAME, testTblName);
-    assertFalse(
-        "Table should not be invalidated after process events as it is a self-event.",
-        catalogTbl instanceof IncompleteTable);
-    hdfsPartition =
-        catalog_.getHdfsPartition(TEST_DB_NAME, testTblName, partKeyVals);
-    assertNotNull(hdfsPartition.getParameters());
-    assertEquals("dummyValue1", hdfsPartition.getParameters().get("dummyKey1"));
+    confirmTableIsLoaded(TEST_DB_NAME, testTblName);
+    // we check for the object hash of the HDFSPartition to make sure that it was not
+    // refresh
+    assertEquals("Partition should not have been refreshed after receiving "
+            + "self-event", hdfsPartition,
+        catalog_.getHdfsPartition(TEST_DB_NAME, testTblName, partKeyVals));
+
+    // compute stats on the table and make sure that the table and its partittions are
+    // not refreshed due to the events
+    alterTableComputeStats(testTblName, Arrays.asList(Arrays.asList("1"),
+        Arrays.asList("2")));
+    // currently there is no good way to find out if a partition was refreshed or not.
+    // When a partition is refreshed, we replace the HDFSPartition objects in the
+    // HDFSTable with the new ones which are reloaded from updated information in HMS.
+    // In order to detect whether the partitions were refreshed, we
+    // compare the HDFSPartition object before and after the events are
+    // processed to make sure that they are the same instance of HDFSPartition
+    HdfsPartition part1Before = catalog_.getHdfsPartition(TEST_DB_NAME, testTblName,
+        partKeyVals);
+    List<TPartitionKeyValue> partKeyVals2 = new ArrayList<>();
+    partKeyVals2.add(new TPartitionKeyValue("p1", "2"));
+    HdfsPartition part2Before = catalog_.getHdfsPartition(TEST_DB_NAME, testTblName,
+        partKeyVals2);
+    // we updated the stats on 2 partitions, we should see atleast 2 alter partition
+    // events
+    assertTrue(eventsProcessor_.getNextMetastoreEvents().size() >= 2);
+    eventsProcessor_.processEvents();
+    confirmTableIsLoaded(TEST_DB_NAME, testTblName);
+    // make sure that the partitions are the same instance
+    assertEquals("Partition should not have been refreshed after receiving the "
+            + "self-event", part1Before,
+        catalog_.getHdfsPartition(TEST_DB_NAME, testTblName, partKeyVals));
+    assertEquals("Partition should not have been refreshed after receiving the "
+            + "self-event", part2Before,
+        catalog_.getHdfsPartition(TEST_DB_NAME, testTblName, partKeyVals2));
   }
 
   private void createDatabase(String dbName, Map<String, String> params)
@@ -2611,6 +2655,34 @@
         "dummyValue1", catalogTbl.getMetaStoreTable().getParameters().get("dummyKey1"));
   }
 
+  private void alterTableComputeStats(String tblName, List<List<String>> partValsList)
+      throws ImpalaException {
+    TDdlExecRequest req = new TDdlExecRequest();
+    req.setDdl_type(TDdlType.ALTER_TABLE);
+
+    TAlterTableParams alterTableParams = new TAlterTableParams();
+    alterTableParams.setAlter_type(TAlterTableType.UPDATE_STATS);
+    alterTableParams.setTable_name(new TTableName(TEST_DB_NAME, tblName));
+    req.setAlter_table_params(alterTableParams);
+
+    TAlterTableUpdateStatsParams updateStatsParams = new TAlterTableUpdateStatsParams();
+    TTableStats tTableStats = new TTableStats();
+    tTableStats.num_rows = 10;
+    tTableStats.total_file_bytes = 1000;
+    updateStatsParams.setTable_stats(tTableStats);
+    Map<List<String>, TPartitionStats> partitionStats = new HashMap<>();
+    for (List<String> partVals : partValsList) {
+      TPartitionStats partStats = new TPartitionStats();
+      partStats.stats = new TTableStats();
+      partStats.stats.num_rows = 6;
+      partitionStats.put(partVals, partStats);
+    }
+
+    updateStatsParams.setPartition_stats(partitionStats);
+
+    alterTableParams.setUpdate_stats_params(updateStatsParams);
+    catalogOpExecutor_.execDdlRequest(req);
+  }
   /**
    * Set partition properties from Impala
    */
@@ -2703,7 +2775,7 @@
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       org.apache.hadoop.hive.metastore.api.Table msTable =
           msClient.getHiveClient().getTable(TEST_DB_NAME, tblName);
-      for (String parameter : TableInvalidatingEvent.parametersToIgnore) {
+      for (String parameter : MetastoreEvents.parametersToIgnore) {
         msTable.getParameters().put(parameter, "1234567");
       }
       msClient.getHiveClient().alter_table_with_environmentContext(
@@ -2802,7 +2874,7 @@
     try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
       Partition partition = metaStoreClient.getHiveClient().getPartition(TEST_DB_NAME,
           tblName, partVal);
-      for (String parameter : TableInvalidatingEvent.parametersToIgnore) {
+      for (String parameter : MetastoreEvents.parametersToIgnore) {
         partition.getParameters().put(parameter, "12334567");
         partitions.add(partition);
       }
diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
index 7d67901..e9d5c4f 100644
--- a/tests/custom_cluster/test_event_processing.py
+++ b/tests/custom_cluster/test_event_processing.py
@@ -15,12 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import random
+import string
 import pytest
 import json
 import time
 import requests
 
-from tests.common.environ import build_flavor_timeout
 from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, \
     SkipIfLocal, SkipIfHive2
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
@@ -61,23 +62,21 @@
       self.run_stmt_in_hive(
         "alter table functional_parquet.testBlackListedTbl add partition (part=1)")
       # wait until all the events generated above are processed
-      EventProcessorUtils.wait_for_event_processing(self.hive_client)
+      EventProcessorUtils.wait_for_event_processing(self)
       assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
       assert EventProcessorUtils.get_last_synced_event_id() > event_id_before
     finally:
       self.run_stmt_in_hive("drop database testBlackListedDb cascade")
       self.run_stmt_in_hive("drop table functional_parquet.testBlackListedTbl")
 
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=2")
+  @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
   @SkipIfHive2.acid
   def test_transactional_insert_events(self):
     """Executes 'run_test_insert_events' for transactional tables.
     """
     self.run_test_insert_events(is_transactional=True)
 
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=2")
+  @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
   def test_insert_events(self):
     """Executes 'run_test_insert_events' for non-transactional tables.
     """
@@ -90,158 +89,314 @@
     Insert overwrite table --> for partitioned and non-partitioned table
     Insert into partition --> for partitioned table
     """
-    db_name = 'test_db'
+    db_name = self.__get_random_name("insert_event_db_")
+    tblproperties = self.__get_transactional_tblproperties(is_transactional)
     with HiveDbWrapper(self, db_name):
-     # Test table with no partitions.
-     TBL_INSERT_NOPART = 'tbl_insert_nopart'
-     self.run_stmt_in_hive("drop table if exists %s.%s" % (db_name, TBL_INSERT_NOPART))
-     last_synced_event_id = self.get_last_synced_event_id()
-     TBLPROPERTIES = ""
-     if is_transactional:
-       TBLPROPERTIES = "TBLPROPERTIES ('transactional'='true'," \
-           "'transactional_properties'='insert_only')"
-     self.run_stmt_in_hive("create table %s.%s (id int, val int) %s"
-         % (db_name, TBL_INSERT_NOPART, TBLPROPERTIES))
-     # Test insert into table, this will fire an insert event.
-     self.run_stmt_in_hive("insert into %s.%s values(101, 200)"
-         % (db_name, TBL_INSERT_NOPART))
-     # With MetastoreEventProcessor running, the insert event will be processed. Query the
-     # table from Impala.
-     assert self.wait_for_insert_event_processing(last_synced_event_id) is True
-     # Verify that the data is present in Impala.
-     data = self.execute_scalar("select * from %s.%s" % (db_name, TBL_INSERT_NOPART))
-     assert data.split('\t') == ['101', '200']
+      # Test table with no partitions.
+      test_tbl_name = 'tbl_insert_nopart'
+      self.run_stmt_in_hive("drop table if exists %s.%s" % (db_name, test_tbl_name))
+      self.run_stmt_in_hive("create table %s.%s (id int, val int) %s"
+         % (db_name, test_tbl_name, tblproperties))
+      # Test insert into table, this will fire an insert event.
+      self.run_stmt_in_hive("insert into %s.%s values(101, 200)"
+         % (db_name, test_tbl_name))
+      # With MetastoreEventProcessor running, the insert event will be processed. Query
+      # the table from Impala.
+      EventProcessorUtils.wait_for_event_processing(self)
+      # Verify that the data is present in Impala.
+      data = self.execute_scalar("select * from %s.%s" % (db_name, test_tbl_name))
+      assert data.split('\t') == ['101', '200']
 
-     # Test insert overwrite. Overwrite the existing value.
-     last_synced_event_id = self.get_last_synced_event_id()
-     self.run_stmt_in_hive("insert overwrite table %s.%s values(101, 201)"
-         % (db_name, TBL_INSERT_NOPART))
-     # Make sure the event has been processed.
-     assert self.wait_for_insert_event_processing(last_synced_event_id) is True
-     # Verify that the data is present in Impala.
-     data = self.execute_scalar("select * from %s.%s" % (db_name, TBL_INSERT_NOPART))
-     assert data.split('\t') == ['101', '201']
+      # Test insert overwrite. Overwrite the existing value.
+      self.run_stmt_in_hive("insert overwrite table %s.%s values(101, 201)"
+         % (db_name, test_tbl_name))
+      # Make sure the event has been processed.
+      EventProcessorUtils.wait_for_event_processing(self)
+      # Verify that the data is present in Impala.
+      data = self.execute_scalar("select * from %s.%s" % (db_name, test_tbl_name))
+      assert data.split('\t') == ['101', '201']
 
-     # Test partitioned table.
-     last_synced_event_id = self.get_last_synced_event_id()
-     TBL_INSERT_PART = 'tbl_insert_part'
-     self.run_stmt_in_hive("drop table if exists %s.%s" % (db_name, TBL_INSERT_PART))
-     self.run_stmt_in_hive("create table %s.%s (id int, name string) "
+      # Test partitioned table.
+      test_part_tblname = 'tbl_insert_part'
+      self.run_stmt_in_hive("drop table if exists %s.%s" % (db_name, test_part_tblname))
+      self.run_stmt_in_hive("create table %s.%s (id int, name string) "
          "partitioned by(day int, month int, year int) %s"
-         % (db_name, TBL_INSERT_PART, TBLPROPERTIES))
-     # Insert data into partitions.
-     self.run_stmt_in_hive("insert into %s.%s partition(day=28, month=03, year=2019)"
-         "values(101, 'x')" % (db_name, TBL_INSERT_PART))
-     # Make sure the event has been processed.
-     assert self.wait_for_insert_event_processing(last_synced_event_id) is True
-     # Verify that the data is present in Impala.
-     data = self.execute_scalar("select * from %s.%s" % (db_name, TBL_INSERT_PART))
-     assert data.split('\t') == ['101', 'x', '28', '3', '2019']
+         % (db_name, test_part_tblname, tblproperties))
+      # Insert data into partitions.
+      self.run_stmt_in_hive("insert into %s.%s partition(day=28, month=03, year=2019)"
+         "values(101, 'x')" % (db_name, test_part_tblname))
+      # Make sure the event has been processed.
+      EventProcessorUtils.wait_for_event_processing(self)
+      # Verify that the data is present in Impala.
+      data = self.execute_scalar("select * from %s.%s" % (db_name, test_part_tblname))
+      assert data.split('\t') == ['101', 'x', '28', '3', '2019']
 
-     # Test inserting into existing partitions.
-     last_synced_event_id = self.get_last_synced_event_id()
-     self.run_stmt_in_hive("insert into %s.%s partition(day=28, month=03, year=2019)"
-         "values(102, 'y')" % (db_name, TBL_INSERT_PART))
-     assert self.wait_for_insert_event_processing(last_synced_event_id) is True
-     # Verify that the data is present in Impala.
-     data = self.execute_scalar("select count(*) from %s.%s where day=28 and month=3 "
-         "and year=2019" % (db_name, TBL_INSERT_PART))
-     assert data.split('\t') == ['2']
+      # Test inserting into existing partitions.
+      self.run_stmt_in_hive("insert into %s.%s partition(day=28, month=03, year=2019)"
+         "values(102, 'y')" % (db_name, test_part_tblname))
+      EventProcessorUtils.wait_for_event_processing(self)
+      # Verify that the data is present in Impala.
+      data = self.execute_scalar("select count(*) from %s.%s where day=28 and month=3 "
+         "and year=2019" % (db_name, test_part_tblname))
+      assert data.split('\t') == ['2']
 
-     # Test insert overwrite into existing partitions
-     last_synced_event_id = self.get_last_synced_event_id()
-     self.run_stmt_in_hive("insert overwrite table %s.%s partition(day=28, month=03, "
-         "year=2019)" "values(101, 'z')" % (db_name, TBL_INSERT_PART))
-     assert self.wait_for_insert_event_processing(last_synced_event_id) is True
-     # Verify that the data is present in Impala.
-     data = self.execute_scalar("select * from %s.%s where day=28 and month=3 and"
-         " year=2019 and id=101" % (db_name, TBL_INSERT_PART))
-     assert data.split('\t') == ['101', 'z', '28', '3', '2019']
+      # Test insert overwrite into existing partitions
+      self.run_stmt_in_hive("insert overwrite table %s.%s partition(day=28, month=03, "
+         "year=2019)" "values(101, 'z')" % (db_name, test_part_tblname))
+      EventProcessorUtils.wait_for_event_processing(self)
+      # Verify that the data is present in Impala.
+      data = self.execute_scalar("select * from %s.%s where day=28 and month=3 and"
+         " year=2019 and id=101" % (db_name, test_part_tblname))
+      assert data.split('\t') == ['101', 'z', '28', '3', '2019']
 
-  @CustomClusterTestSuite.with_args(
-    catalogd_args="--hms_event_polling_interval_s=1"
-  )
+  @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
   @SkipIfHive2.acid
   def test_empty_partition_events_transactional(self, unique_database):
     self._run_test_empty_partition_events(unique_database, True)
 
-  @CustomClusterTestSuite.with_args(
-    catalogd_args="--hms_event_polling_interval_s=1"
-  )
+  @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
   def test_empty_partition_events(self, unique_database):
     self._run_test_empty_partition_events(unique_database, False)
 
+  @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
+  def test_self_events(self, unique_database):
+    """Runs multiple queries which generate events and makes
+    sure that tables and partitions are not refreshed the queries is run from Impala. If
+    the queries are run from Hive, we make sure that the tables and partitions are
+    refreshed"""
+    self.__run_self_events_test(unique_database, True)
+    self.__run_self_events_test(unique_database, False)
+
   def _run_test_empty_partition_events(self, unique_database, is_transactional):
-    TBLPROPERTIES = ""
-    if is_transactional:
-       TBLPROPERTIES = "TBLPROPERTIES ('transactional'='true'," \
-           "'transactional_properties'='insert_only')"
     test_tbl = unique_database + ".test_events"
+    TBLPROPERTIES = self.__get_transactional_tblproperties(is_transactional)
     self.run_stmt_in_hive("create table {0} (key string, value string) \
       partitioned by (year int) stored as parquet {1}".format(test_tbl, TBLPROPERTIES))
-    EventProcessorUtils.wait_for_event_processing(self.hive_client)
+    EventProcessorUtils.wait_for_event_processing(self)
     self.client.execute("describe {0}".format(test_tbl))
 
     self.run_stmt_in_hive(
       "alter table {0} add partition (year=2019)".format(test_tbl))
-    EventProcessorUtils.wait_for_event_processing(self.hive_client)
+    EventProcessorUtils.wait_for_event_processing(self)
     assert [('2019',)] == self.get_impala_partition_info(test_tbl, 'year')
 
     self.run_stmt_in_hive(
       "alter table {0} add if not exists partition (year=2019)".format(test_tbl))
-    EventProcessorUtils.wait_for_event_processing(self.hive_client)
+    EventProcessorUtils.wait_for_event_processing(self)
     assert [('2019',)] == self.get_impala_partition_info(test_tbl, 'year')
     assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
 
     self.run_stmt_in_hive(
       "alter table {0} drop partition (year=2019)".format(test_tbl))
-    EventProcessorUtils.wait_for_event_processing(self.hive_client)
+    EventProcessorUtils.wait_for_event_processing(self)
     assert ('2019') not in self.get_impala_partition_info(test_tbl, 'year')
     assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
 
     self.run_stmt_in_hive(
       "alter table {0} drop if exists partition (year=2019)".format(test_tbl))
-    EventProcessorUtils.wait_for_event_processing(self.hive_client)
+    EventProcessorUtils.wait_for_event_processing(self)
     assert ('2019') not in self.get_impala_partition_info(test_tbl, 'year')
     assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
 
+  def __get_tbl_location(self, db_name, tbl_name):
+    assert self.hive_client is not None
+    return self.hive_client.get_table(db_name, tbl_name).sd.location
 
-  def wait_for_insert_event_processing(self, previous_event_id):
-    """Waits until the event processor has finished processing insert events. Since two
-    events are created for every insert done through hive, we wait until the event id is
-    incremented by at least two. Returns true if at least two events were processed within
-    self.PROCESSING_TIMEOUT_S, False otherwise.
+  def __get_transactional_tblproperties(self, is_transactional):
     """
-    new_event_id = self.get_last_synced_event_id()
-    success = False
-    end_time = time.time() + self.PROCESSING_TIMEOUT_S
-    while time.time() < end_time:
-      new_event_id = self.get_last_synced_event_id()
-      if new_event_id - previous_event_id >= 2:
-        success = True
-        break
-      time.sleep(0.1)
-    # Wait for catalog update to be propagated.
-    time.sleep(build_flavor_timeout(2, slow_build_timeout=4))
-    return success
+    Util method to generate the tblproperties for transactional tables
+    """
+    tblproperties = ""
+    if is_transactional:
+       tblproperties = "tblproperties ('transactional'='true'," \
+           "'transactional_properties'='insert_only')"
+    return tblproperties
 
-  def get_event_processor_metrics(self):
-    """Scrapes the catalog's /events webpage and return a dictionary with the event
-    processor metrics."""
-    response = requests.get("%s/events?json" % self.CATALOG_URL)
-    assert response.status_code == requests.codes.ok
-    varz_json = json.loads(response.text)
-    metrics = varz_json["event_processor_metrics"].strip().splitlines()
+  def __run_self_events_test(self, db_name, use_impala):
+    recover_tbl_name = self.__get_random_name("tbl_")
+    # create a table similar to alltypes so that we can recover the partitions on it
+    # later in one of the test queries
+    alltypes_tab_location = self.__get_tbl_location("functional", "alltypes")
+    self.client.execute(
+      "create external table {0}.{1} like functional.alltypes location '{2}'".format(
+        db_name, recover_tbl_name, alltypes_tab_location))
+    if use_impala:
+      queries = self.__get_impala_test_queries(db_name, recover_tbl_name)
+      # some queries do not trigger self-event evaluation (creates and drops) however,
+      # its still good to confirm that we don't do unnecessary refreshes in such cases
+      for stmt in queries[False]:
+        self.__exec_sql_and_check_selfevent_counter(stmt, use_impala, False)
+      # All the queries with True key should confirm that the self-events-skipped counter
+      # is also incremented
+      for stmt in queries[True]:
+        self.__exec_sql_and_check_selfevent_counter(stmt, use_impala)
+    else:
+      queries = self.__get_hive_test_queries(db_name, recover_tbl_name)
+      for stmt in queries:
+        self.__exec_sql_and_check_selfevent_counter(stmt, use_impala)
 
-    # Helper to strip a pair of elements
-    def strip_pair(p):
-      return (p[0].strip(), p[1].strip())
+  def __get_impala_test_queries(self, db_name, recover_tbl_name):
+    tbl_name = self.__get_random_name("tbl_")
+    tbl2 = self.__get_random_name("tbl_")
+    view_name = self.__get_random_name("view_")
+    self_event_test_queries = {
+      # Queries which will increment the self-events-skipped counter
+      True: [
+          "comment on database {0} is 'self-event test database'".format(db_name),
+          "alter database {0} set owner user `test-user`".format(db_name),
+          # ALTER_TABLE case
+          "alter table {0}.{1} set TBLPROPERTIES ('k'='v')".format(db_name, tbl_name),
+          "alter table {0}.{1} ADD COLUMN c1 int".format(db_name, tbl_name),
+          "alter table {0}.{1} ALTER COLUMN C1 set comment 'c1 comment'".format(db_name,
+                                                                                tbl_name),
+          "alter table {0}.{1} ADD COLUMNS (c2 int, c3 string)".format(db_name, tbl_name),
+          "alter table {0}.{1} DROP COLUMN c1".format(db_name, tbl_name),
+          "alter table {0}.{1} DROP COLUMN c2".format(db_name, tbl_name),
+          "alter table {0}.{1} DROP COLUMN c3".format(db_name, tbl_name),
+          "alter table {0}.{1} set owner user `test-user`".format(db_name, tbl_name),
+          "alter table {0}.{1} set owner role `test-role`".format(db_name, tbl_name),
+          "alter table {0}.{1} rename to {0}.{2}".format(db_name, tbl_name, tbl2),
+          "alter view {0}.{1} set owner user `test-view-user`".format(db_name, view_name),
+          "alter view {0}.{1} set owner role `test-view-role`".format(db_name, view_name),
+          "alter view {0}.{1} rename to {0}.{2}".format(db_name, view_name,
+                                                        self.__get_random_name("view_")),
+          # ADD_PARTITION cases
+          # dynamic partition insert (creates new partitions)
+          "insert into table {0}.{1} partition (year,month) "
+          "select * from functional.alltypessmall".format(db_name, tbl2),
+          # add partition
+          "alter table {0}.{1} add if not exists partition (year=1111, month=1)".format(
+            db_name, tbl2),
+          # insert into a existing partition; generates ALTER_PARTITION
+          # TODO add support for insert_events (IMPALA-8632)
+          # "insert into table {0}.{1} partition (year, month) "
+          # "select * from functional.alltypessmall where year=2009 and month=1".format(
+          #  db_name, tbl2),
+          # compute stats will generates ALTER_PARTITION
+          "compute stats {0}.{1}".format(db_name, tbl2),
+          "alter table {0}.{1} recover partitions".format(db_name, recover_tbl_name)],
+      # Queries which will not increment the self-events-skipped counter
+      False: [
+          "create table {0}.{1} like functional.alltypessmall "
+          "stored as parquet".format(db_name, tbl_name),
+          "create view {0}.{1} as select * from functional.alltypessmall "
+            "where year=2009".format(db_name, view_name),
+          # we add this statement below just to make sure that the subsequent statement is
+          # a no-op
+          "alter table {0}.{1} add if not exists partition (year=2100, month=1)".format(
+            db_name, tbl_name),
+          "alter table {0}.{1} add if not exists partition (year=2100, month=1)".format(
+            db_name, tbl_name),
+          # DROP_PARTITION cases
+          "alter table {0}.{1} drop if exists partition (year=2100, month=1)".format(
+            db_name, tbl_name),
+          # drop non-existing partition; essentially this is a no-op
+          "alter table {0}.{1} drop if exists partition (year=2100, month=1)".format(
+            db_name, tbl_name)]
+    }
+    return self_event_test_queries
 
-    pairs = [strip_pair(kv.split(':')) for kv in metrics if kv]
-    return dict(pairs)
+  def __get_hive_test_queries(self, db_name, recover_tbl_name):
+    tbl_name = self.__get_random_name("tbl_")
+    tbl2 = self.__get_random_name("tbl_")
+    view_name = self.__get_random_name("view_")
+    # we use a custom table schema to make it easier to change columns later in the
+    # test_queries
+    self.client.execute("create table {0}.{1} (key int) partitioned by "
+                        "(part int) stored as parquet".format(db_name, tbl_name))
+    self.client.execute(
+      "create view {0}.{1} as select * from functional.alltypessmall where year=2009"
+        .format(db_name, view_name))
+    # events-processor only refreshes loaded tables, hence its important to issue a
+    # refresh here so that table is in loaded state
+    self.client.execute("refresh {0}.{1}".format(db_name, tbl_name))
+    self_event_test_queries = [
+      # ALTER_DATABASE cases
+      "alter database {0} set dbproperties ('comment'='self-event test database')".format(
+        db_name),
+      "alter database {0} set owner user `test-user`".format(db_name),
+      # ALTER_TABLE case
+      "alter table {0}.{1} set tblproperties ('k'='v')".format(db_name, tbl_name),
+      "alter table {0}.{1} add columns (value string)".format(db_name, tbl_name),
+      "alter table {0}.{1} set owner user `test-user`".format(db_name, tbl_name),
+      "alter table {0}.{1} set owner role `test-role`".format(db_name, tbl_name),
+      "alter table {0}.{1} rename to {0}.{2}".format(db_name, tbl_name, tbl2),
+      "alter view {0}.{1} rename to {0}.{2}".format(db_name, view_name,
+                                                    self.__get_random_name("view_")),
+      # need to set this config to make sure the dynamic partition insert works below
+      "set hive.exec.dynamic.partition.mode=nonstrict",
+      # ADD_PARTITION cases
+      "insert into table {0}.{1} partition (part=2009) "
+      "select id as key, string_col as value from functional.alltypessmall".format(
+        db_name, tbl2),
+      # add partition
+      "alter table {0}.{1} add if not exists partition (part=1111)".format(
+        db_name, tbl2),
+      # add existing partition; essentially this is a no-op
+      "alter table {0}.{1} add if not exists partition (part=1111)".format(
+        db_name, tbl2),
+      # DROP_PARTITION cases
+      "alter table {0}.{1} drop if exists partition (part=1111)".format(
+        db_name, tbl2),
+      # drop non-existing partition; essentially this is a no-op
+      "alter table {0}.{1} drop if exists partition (part=1111)".format(
+        db_name, tbl2),
+      # compute stats will generates ALTER_PARTITION
+      "analyze table {0}.{1} compute statistics for columns".format(db_name, tbl2),
+      "msck repair table {0}.{1}".format(db_name, recover_tbl_name)
+    ]
+    return self_event_test_queries
 
-  def get_last_synced_event_id(self):
-    """Returns the last_synced_event_id."""
-    metrics = self.get_event_processor_metrics()
-    assert 'last-synced-event-id' in metrics.keys()
-    return int(metrics['last-synced-event-id'])
+  @staticmethod
+  def __get_self_event_metrics():
+    """
+    Gets the self-events-skipped, tables-refreshed and partitions-refreshed metric values
+    from Metastore EventsProcessor
+    """
+    tbls_refreshed_count = EventProcessorUtils.get_event_processor_metric(
+      'tables-refreshed', 0)
+    partitions_refreshed_count = EventProcessorUtils.get_event_processor_metric(
+      'partitions-refreshed', 0)
+    self_events_count = EventProcessorUtils.get_event_processor_metric(
+      'self-events-skipped', 0)
+    return int(self_events_count), int(tbls_refreshed_count), int(
+      partitions_refreshed_count)
+
+  def __exec_sql_and_check_selfevent_counter(self, stmt, use_impala_client,
+                                             check_self_event_counter=True):
+    """
+    Method runs a given query statement using a impala client or hive client based on the
+    argument use_impala_client and confirms if the self-event related counters are as
+    expected based on whether we expect a self-event or not. If the
+    check_self_event_counter is False it skips checking the self-events-skipped metric.
+    """
+    self_events, tbls_refreshed, partitions_refreshed = self.__get_self_event_metrics()
+    if not use_impala_client:
+      self.run_stmt_in_hive(stmt)
+    else:
+      self.client.execute(stmt)
+
+    EventProcessorUtils.wait_for_event_processing(self)
+    self_events_after, tbls_refreshed_after, partitions_refreshed_after = \
+      self.__get_self_event_metrics()
+    # we assume that any event which comes due to stmts run from impala-client are
+    # self-events
+    if use_impala_client:
+      # self-event counter must increase if this is a self-event if
+      # check_self_event_counter is set
+      if check_self_event_counter:
+        assert self_events_after > self_events
+      # if this is a self-event, no table or partitions should be refreshed
+      assert tbls_refreshed == tbls_refreshed_after
+      assert partitions_refreshed == partitions_refreshed_after
+    else:
+      # hive was used to run the stmts, any events generated should not have been deemed
+      # as self events
+      assert self_events == self_events_after
+
+  @staticmethod
+  def __get_random_name(prefix=''):
+    """
+    Gets a random name used to create unique database or table
+    """
+    return prefix + ''.join(random.choice(string.ascii_lowercase) for i in range(5))
diff --git a/tests/util/event_processor_utils.py b/tests/util/event_processor_utils.py
index 78123e3..c50d14b 100644
--- a/tests/util/event_processor_utils.py
+++ b/tests/util/event_processor_utils.py
@@ -19,37 +19,45 @@
 # modifies the metadata via Hive and checks that the modification
 # succeeded by querying Impala, or vice versa.
 
+import logging
 import requests
 import time
 import json
-from tests.common.environ import build_flavor_timeout
 
-
+LOG = logging.getLogger('event_processor_utils')
+LOG.setLevel(level=logging.DEBUG)
 class EventProcessorUtils(object):
 
   DEFAULT_CATALOG_URL = "http://localhost:25020"
 
   @staticmethod
-  def wait_for_event_processing(hive_client, timeout=10):
+  def wait_for_event_processing(test_suite, timeout=10):
       """Waits till the event processor has synced to the latest event id from metastore
          or the timeout value in seconds whichever is earlier"""
       success = False
       assert timeout > 0
-      assert hive_client is not None
-      current_event_id = EventProcessorUtils.get_current_notification_id(hive_client)
+      assert test_suite.hive_client is not None
+      current_event_id = EventProcessorUtils.get_current_notification_id(
+        test_suite.hive_client)
+      LOG.info("Waiting until events processor syncs to event id:" + str(current_event_id))
       end_time = time.time() + timeout
       while time.time() < end_time:
-        new_event_id = EventProcessorUtils.get_last_synced_event_id()
-        if new_event_id >= current_event_id:
+        last_synced_id = EventProcessorUtils.get_last_synced_event_id()
+        if last_synced_id >= current_event_id:
+          LOG.debug(
+            "Metric last-synced-event-id has reached the desired value:" + str(
+              last_synced_id))
           success = True
           break
         time.sleep(0.1)
       if not success:
         raise Exception(
-          "Event processor did not sync till last known event id{0} \
+          "Event processor did not sync till last known event id {0} \
           within {1} seconds".format(current_event_id, timeout))
-      # Wait for catalog update to be propagated.
-      time.sleep(build_flavor_timeout(6, slow_build_timeout=10))
+      # Wait until the impalad catalog versions agree with the catalogd's version.
+      catalogd_version = test_suite.cluster.catalogd.service.get_catalog_version()
+      for impalad in test_suite.cluster.impalads:
+        impalad.service.wait_for_metric_value("catalog.curr-version", catalogd_version)
       return success
 
   @staticmethod
@@ -69,6 +77,14 @@
      return dict(pairs)
 
   @staticmethod
+  def get_event_processor_metric(metric_key, default_val=None):
+    """Returns the event processor metric from the /events catalog debug page"""
+    metrics = EventProcessorUtils.get_event_processor_metrics()
+    if metric_key not in metrics:
+      return default_val
+    return metrics[metric_key]
+
+  @staticmethod
   def get_last_synced_event_id():
     """Returns the last_synced_event_id."""
     metrics = EventProcessorUtils.get_event_processor_metrics()
@@ -88,4 +104,4 @@
   def get_current_notification_id(hive_client):
     """Returns the current notification from metastore"""
     assert hive_client is not None
-    return hive_client.get_current_notificationEventId()
+    return int(hive_client.get_current_notificationEventId().eventId)