// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.impala.catalog.events;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Map;

import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterDatabaseMessage;
import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterTableMessage;
import org.apache.hadoop.hive.metastore.messaging.json.JSONCreateDatabaseMessage;
import org.apache.hadoop.hive.metastore.messaging.json.JSONDropDatabaseMessage;
import org.apache.hadoop.hive.metastore.messaging.json.JSONDropTableMessage;
import org.apache.impala.analysis.TableName;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.catalog.DatabaseNotFoundException;
import org.apache.impala.catalog.Db;
import org.apache.impala.catalog.Table;
import org.apache.impala.catalog.TableNotFoundException;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.common.Metrics;
import org.apache.impala.common.Pair;
import org.apache.impala.common.Reference;
import org.apache.impala.thrift.TPartitionKeyValue;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.ClassUtil;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import org.apache.hadoop.hive.common.FileUtils;

/**
 * Main class which provides Metastore event objects for various event types. Also
 * provides a MetastoreEventFactory to get or create the event instances for a given event
 * type
 */
public class MetastoreEvents {

  /**
   * This enum contains keys for parameters added in Metastore entities, relevant for
   * event processing. When eventProcessor is instantiated, we make sure during config
   * validation that these parameters are not filtered out through the Metastore config
   * EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS.
   */
  public enum MetastoreEventPropertyKey {
    // key to be used for catalog version in table properties for detecting self-events
    CATALOG_VERSION("impala.events.catalogVersion"),
    // key to be used for catalog service id for detecting self-events
    CATALOG_SERVICE_ID("impala.events.catalogServiceId"),
    // flag to be set in the table/database parameters to disable event based metadata
    // sync. Note the this is a user-facing property. Any changes to this key name
    // will break backwards compatibility
    DISABLE_EVENT_HMS_SYNC("impala.disableHmsSync");

    private String key_;

    MetastoreEventPropertyKey(String key) { this.key_ = key; }

    public String getKey() { return key_; }
  }

  public enum MetastoreEventType {
    CREATE_TABLE("CREATE_TABLE"),
    DROP_TABLE("DROP_TABLE"),
    ALTER_TABLE("ALTER_TABLE"),
    CREATE_DATABASE("CREATE_DATABASE"),
    DROP_DATABASE("DROP_DATABASE"),
    ALTER_DATABASE("ALTER_DATABASE"),
    ADD_PARTITION("ADD_PARTITION"),
    ALTER_PARTITION("ALTER_PARTITION"),
    DROP_PARTITION("DROP_PARTITION"),
    INSERT("INSERT"),
    OTHER("OTHER");

    private final String eventType_;

    MetastoreEventType(String msEventType) {
      this.eventType_ = msEventType;
    }

    @Override
    public String toString() {
      return eventType_;
    }

    /**
     * Returns the MetastoreEventType from a given string value of event from Metastore's
     * NotificationEvent.eventType. If none of the supported MetastoreEventTypes match,
     * return OTHER
     *
     * @param eventType EventType value from the <code>NotificationEvent</code>
     */
    public static MetastoreEventType from(String eventType) {
      for (MetastoreEventType metastoreEventType : values()) {
        if (metastoreEventType.eventType_.equalsIgnoreCase(eventType)) {
          return metastoreEventType;
        }
      }
      return OTHER;
    }
  }

  /**
   * Factory class to create various MetastoreEvents.
   */
  public static class MetastoreEventFactory {

    private static final Logger LOG =
        LoggerFactory.getLogger(MetastoreEventFactory.class);

    // catalog service instance to be used for creating eventHandlers
    private final CatalogServiceCatalog catalog_;
    // metrics registry to be made available for each events to publish metrics
    private final Metrics metrics_;

    public MetastoreEventFactory(CatalogServiceCatalog catalog, Metrics metrics) {
      this.catalog_ = Preconditions.checkNotNull(catalog);
      this.metrics_ = Preconditions.checkNotNull(metrics);
    }

    /**
     * creates instance of <code>MetastoreEvent</code> used to process a given event type.
     * If the event type is unknown, returns a IgnoredEvent
     */
    private MetastoreEvent get(NotificationEvent event)
        throws MetastoreNotificationException {
      Preconditions.checkNotNull(event.getEventType());
      MetastoreEventType metastoreEventType =
          MetastoreEventType.from(event.getEventType());
      switch (metastoreEventType) {
        case CREATE_TABLE: return new CreateTableEvent(catalog_, metrics_, event);
        case DROP_TABLE: return new DropTableEvent(catalog_, metrics_, event);
        case ALTER_TABLE: return new AlterTableEvent(catalog_, metrics_, event);
        case CREATE_DATABASE: return new CreateDatabaseEvent(catalog_, metrics_, event);
        case DROP_DATABASE: return new DropDatabaseEvent(catalog_, metrics_, event);
        case ALTER_DATABASE: return new AlterDatabaseEvent(catalog_, metrics_, event);
        case ADD_PARTITION:
          // add partition events triggers invalidate table currently
          return new AddPartitionEvent(catalog_, metrics_, event);
        case DROP_PARTITION:
          // drop partition events triggers invalidate table currently
          return new DropPartitionEvent(catalog_, metrics_, event);
        case ALTER_PARTITION:
          // alter partition events triggers invalidate table currently
          return new AlterPartitionEvent(catalog_, metrics_, event);
        case INSERT:
          // Insert events trigger refresh on a table/partition currently
          return new InsertEvent(catalog_, metrics_, event);
        default:
          // ignore all the unknown events by creating a IgnoredEvent
          return new IgnoredEvent(catalog_, metrics_, event);
      }
    }

    /**
     * Given a list of notification events, returns a list of <code>MetastoreEvent</code>
     * In case there are create events which are followed by drop events for the same
     * object, the create events are filtered out. The drop events do not need to be
     * filtered out
     *
     * This is needed to avoid the replay problem. For example, if catalog created and
     * removed a table, the create event received will try to add the object again. This
     * table will be visible until the drop table event is processed. This can be avoided
     * by "looking ahead" in the event stream to see if the table with the same name was
     * dropped. In such a case, the create event can be ignored
     *
     * @param events NotificationEvents fetched from metastore
     * @return A list of MetastoreEvents corresponding to the given the NotificationEvents
     * @throws MetastoreNotificationException if a NotificationEvent could not be
     *     parsed into MetastoreEvent
     */
    List<MetastoreEvent> getFilteredEvents(List<NotificationEvent> events)
        throws MetastoreNotificationException {
      Preconditions.checkNotNull(events);
      if (events.isEmpty()) return Collections.emptyList();

      List<MetastoreEvent> metastoreEvents = new ArrayList<>(events.size());
      for (NotificationEvent event : events) {
        metastoreEvents.add(get(event));
      }
      // filter out the create events which has a corresponding drop event later
      int sizeBefore = metastoreEvents.size();
      int numFilteredEvents = 0;
      int i = 0;
      while (i < metastoreEvents.size()) {
        MetastoreEvent currentEvent = metastoreEvents.get(i);
        String eventDb = currentEvent.getDbName();
        String eventTbl = currentEvent.getTableName();
        // if the event is on blacklisted db or table we should filter it out
        if (catalog_.isBlacklistedDb(eventDb) || (eventTbl != null && catalog_
            .isBlacklistedTable(eventDb, eventTbl))) {
          String blacklistedObject = eventTbl != null ? new TableName(eventDb,
              eventTbl).toString() : eventDb;
          LOG.info(currentEvent.debugString("Filtering out this event since it is on a "
              + "blacklisted database or table %s", blacklistedObject));
          metastoreEvents.remove(i);
          numFilteredEvents++;
        } else if (currentEvent.isRemovedAfter(metastoreEvents.subList(i + 1,
            metastoreEvents.size()))) {
          LOG.info(currentEvent.debugString("Filtering out this event since the object is "
              + "either removed or renamed later in the event stream"));
          metastoreEvents.remove(i);
          numFilteredEvents++;
        } else {
          i++;
        }
      }
      LOG.info(String.format("Total number of events received: %d Total number of events "
          + "filtered out: %d", sizeBefore, numFilteredEvents));
      metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
              .inc(numFilteredEvents);
      return metastoreEvents;
    }
  }

  /**
   * Abstract base class for all MetastoreEvents. A MetastoreEvent is a object used to
   * process a NotificationEvent received from metastore. It is self-contained with all
   * the information needed to take action on catalog based on a the given
   * NotificationEvent
   */
  public static abstract class MetastoreEvent {

    // String.format compatible string to prepend event id and type
    private static final String STR_FORMAT_EVENT_ID_TYPE = "EventId: %d EventType: %s ";

    // logger format compatible string to prepend to a log formatted message
    private static final String LOG_FORMAT_EVENT_ID_TYPE = "EventId: {} EventType: {} ";

    // CatalogServiceCatalog instance on which the event needs to be acted upon
    protected final CatalogServiceCatalog catalog_;

    // the notification received from metastore which is processed by this
    protected final NotificationEvent event_;

    // Logger available for all the sub-classes
    protected final Logger LOG = LoggerFactory.getLogger(this.getClass());

    // dbName from the event
    protected final String dbName_;

    // tblName from the event
    protected final String tblName_;

    // eventId of the event. Used instead of calling getter on event_ everytime
    protected final long eventId_;

    // eventType from the NotificationEvent
    protected final MetastoreEventType eventType_;

    // Actual notificationEvent object received from Metastore
    protected final NotificationEvent metastoreNotificationEvent_;

    // metrics registry so that events can add metrics
    protected final Metrics metrics_;

    // version number from the event object parameters used for self-event detection
    protected long versionNumberFromEvent_ = -1;
    // service id from the event object parameters used for self-event detection
    protected String serviceIdFromEvent_ = null;
    // the list of versions which this catalog expects to be see in the
    // self-generated events in order. Anything which is seen out of order of this list
    // will be used to determine that this is not a self-event and table will be
    // invalidated. See <code>isSelfEvent</code> for more details.
    protected List<Long> pendingVersionNumbersFromCatalog_ = Collections.EMPTY_LIST;

    MetastoreEvent(CatalogServiceCatalog catalogServiceCatalog, Metrics metrics,
        NotificationEvent event) {
      this.catalog_ = catalogServiceCatalog;
      this.event_ = event;
      this.eventId_ = event_.getEventId();
      this.eventType_ = MetastoreEventType.from(event.getEventType());
      // certain event types in Hive-3 like COMMIT_TXN may not have dbName set
      this.dbName_ = event.getDbName();
      this.tblName_ = event.getTableName();
      this.metastoreNotificationEvent_ = event;
      this.metrics_ = metrics;
    }

    public String getDbName() { return dbName_; }

    public String getTableName() { return tblName_; }

    /**
     * Process this event if it is enabled based on the flags on this object
     *
     * @throws CatalogException If  Catalog operations fail
     * @throws MetastoreNotificationException If NotificationEvent parsing fails
     */
    public void processIfEnabled()
        throws CatalogException, MetastoreNotificationException {
      if (isEventProcessingDisabled()) {
        LOG.info(debugString("Skipping this event because of flag evaluation"));
        metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
        return;
      }
      process();
    }

    /**
     * Process the information available in the NotificationEvent to take appropriate
     * action on Catalog
     *
     * @throws MetastoreNotificationException in case of event parsing errors out
     * @throws CatalogException in case catalog operations could not be performed
     */
    protected abstract void process()
        throws MetastoreNotificationException, CatalogException;

    /**
     * Helper method to get debug string with helpful event information prepended to the
     * message. This can be used to generate helpful exception messages
     *
     * @param msgFormatString String value to be used in String.format() for the given
     *     message
     * @param args args to the <code>String.format()</code> for the given
     *     msgFormatString
     */
    protected String debugString(String msgFormatString, Object... args) {
      String formatString =
          new StringBuilder(STR_FORMAT_EVENT_ID_TYPE).append(msgFormatString).toString();
      Object[] formatArgs = getLogFormatArgs(args);
      return String.format(formatString, formatArgs);
    }

    /**
     * Helper method to generate the format args after prepending the event id and type
     */
    private Object[] getLogFormatArgs(Object[] args) {
      Object[] formatArgs = new Object[args.length + 2];
      formatArgs[0] = eventId_;
      formatArgs[1] = eventType_;
      int i = 2;
      for (Object arg : args) {
        formatArgs[i] = arg;
        i++;
      }
      return formatArgs;
    }

    /**
     * Logs at info level the given log formatted string and its args. The log formatted
     * string should have {} pair at the appropriate location in the string for each arg
     * value provided. This method prepends the event id and event type before logging the
     * message. No-op if the log level is not at INFO
     */
    protected void infoLog(String logFormattedStr, Object... args) {
      if (!LOG.isInfoEnabled()) return;
      String formatString =
          new StringBuilder(LOG_FORMAT_EVENT_ID_TYPE).append(logFormattedStr).toString();
      Object[] formatArgs = getLogFormatArgs(args);
      LOG.info(formatString, formatArgs);
    }

    /**
     * Similar to infoLog excepts logs at debug level
     */
    protected void debugLog(String logFormattedStr, Object... args) {
      if (!LOG.isDebugEnabled()) return;
      String formatString =
          new StringBuilder(LOG_FORMAT_EVENT_ID_TYPE).append(logFormattedStr).toString();
      Object[] formatArgs = getLogFormatArgs(args);
      LOG.debug(formatString, formatArgs);
    }

    /**
     * Search for a inverse event (for example drop_table is a inverse event for
     * create_table) for this event from a given list of notificationEvents starting for
     * the startIndex. This is useful for skipping certain events from processing
     *
     * @param events List of NotificationEvents to be searched
     * @return true if the object is removed after this event, else false
     */
    protected boolean isRemovedAfter(List<MetastoreEvent> events) {
      return false;
    }

    /**
     * Returns true if event based sync is disabled for this table/database associated
     * with this event
     */
    protected abstract boolean isEventProcessingDisabled();

    /**
     * This method detects if this event is self-generated or not (see class
     * documentation of <code>MetastoreEventProcessor</code> to understand what a
     * self-event is).
     *
     * In order to determine this, it compares the value of catalogVersion from the
     * event with the list of pending version numbers stored in the catalog
     * database/table. The event could be generated by another instance of CatalogService
     * which can potentially have the same versionNumber. In order to resolve such
     * conflict, it compares the CatalogService's serviceId before comparing the version
     * number. If it is determined that this is indeed a self-event, this method also
     * clears the version number from the catalog database/table's list of pending
     * versions for in-flight events. This is needed so that a subsequent event with the
     * same service id or version number is not incorrectly determined as a self-event. A
     * subsequent event with the same serviceId and versionNumber is most likely generated
     * by a non-Impala system because it cached the table object having those values of
     * serviceId and version. More details on complete flow of self-event handling
     * logic can be read in <code>MetastoreEventsProcessor</code> documentation.
     *
     * @return True if this event is a self-generated event. If the returned value is
     * true, this method also clears the version number from the catalog database/table.
     * Returns false if the version numbers or service id don't match
     * @throws CatalogException in case of exceptions while removing the version number
     * from the database/table or when reading the values of version list from catalog
     * database/table
     */
    protected boolean isSelfEvent() throws CatalogException {
      initSelfEventIdentifiersFromEvent();
      if (versionNumberFromEvent_ == -1 || pendingVersionNumbersFromCatalog_.isEmpty()) {
        return false;
      }

      // first check if service id is a match, then check if the event version is what we
      // expect in the list
      if (catalog_.getCatalogServiceId().equals(serviceIdFromEvent_)
          && pendingVersionNumbersFromCatalog_.get(0).equals(versionNumberFromEvent_)) {
        // we see this version for the first time. This is a self-event
        // remove this version number from the catalog so that next time we see this
        // version we don't determine it wrongly as self-event
        // TODO we should improve by atomically doing this check and invalidating the
        // table to avoid any races. Currently, it is possible the some other thread
        // in CatalogService changes the pendingVersionNumbersFromCatalog after do
        // the check here. However, there are only two possible operations that can
        // happen from outside with respect to this version list. Either some thread
        // adds a new version to the list after we looked at it, or the table is
        // invalidated. In both the cases, it is OK since the new version added is
        // guaranteed to be greater than versionNumberFromEvent and if the table is
        // invalidated, this operation (this whole event) becomes a no-op
        catalog_.removeFromInFlightVersionsForEvents(
            dbName_, tblName_, versionNumberFromEvent_);
        metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS).inc();
        return true;
      }
      return false;
    }

    /**
     * This method should be implemented by subclasses to initialize the values of
     * self-event identifiers by parsing the event data. These identifiers are later
     * used to determine if this event needs to be processed or not. See
     * <code>isSelfEvent</code> method for details.
     */
    protected void initSelfEventIdentifiersFromEvent() {
      throw new UnsupportedOperationException(
          String.format("%s is not supported", ClassUtil.getMethodName()));
    }

    protected static String getStringProperty(
        Map<String, String> params, String key, String defaultVal) {
      if (params == null) return defaultVal;
      return params.getOrDefault(key, defaultVal);
    }
  }

  /**
   * Base class for all the table events
   */
  public static abstract class MetastoreTableEvent extends MetastoreEvent {
    // tblName from the event
    protected final String tblName_;

    // tbl object from the Notification event, corresponds to the before tableObj in
    // case of alter events
    protected org.apache.hadoop.hive.metastore.api.Table msTbl_;

    private MetastoreTableEvent(CatalogServiceCatalog catalogServiceCatalog,
        Metrics metrics, NotificationEvent event) {
      super(catalogServiceCatalog, metrics, event);
      Preconditions.checkNotNull(dbName_, debugString("Database name cannot be null"));
      tblName_ = Preconditions.checkNotNull(event.getTableName());
      debugLog("Creating event {} of type {} on table {}", eventId_, eventType_,
          getFullyQualifiedTblName());
    }


    /**
     * Util method to return the fully qualified table name which is of the format
     * dbName.tblName for this event
     */
    protected String getFullyQualifiedTblName() {
      return new TableName(dbName_, tblName_).toString();
    }

    /**
     * Util method to issue invalidate on a given table on the catalog. This method
     * atomically invalidates the table if it exists in the catalog. No-op if the table
     * does not exist
     */
    protected boolean invalidateCatalogTable() {
      boolean tableInvalidated =
          catalog_.invalidateTableIfExists(dbName_, tblName_) != null;
      if (tableInvalidated) {
        metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLE_INVALIDATES).inc();
      }
      return tableInvalidated;
    }

    /**
     * Checks if the table level property is set in the parameters of the table from the
     * event. If it is available, it takes precedence over database level flag for this
     * table. If the table level property is not set, returns the value from the database
     * level property.f
     *
     * @return Boolean value of the table property with the key
     *     <code>MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC</code>. Else,
     *     returns the database property which is associated with this table. Returns
     *     false if neither of the properties are set.
     */
    @Override
    protected boolean isEventProcessingDisabled() {
      Preconditions.checkNotNull(msTbl_);
      Boolean tblProperty = getHmsSyncProperty(msTbl_);
      if (tblProperty != null) {
        infoLog("Found table level flag {} is set to {} for table {}",
            MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey(),
            tblProperty.toString(),
            getFullyQualifiedTblName());
        return tblProperty;
      }
      // if the tbl property is not set check at db level
      String dbFlagVal = catalog_.getDbProperty(dbName_,
          MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey());
      if (dbFlagVal != null) {
        // no need to spew unnecessary logs. Most tables/databases are expected to not
        // have this flag set when event based HMS polling is enabled
        debugLog("Table level flag is not set. Db level flag {} is {} for "
                + "database {}",
            MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey(),
            dbFlagVal, dbName_);
      }
      // flag value of null also returns false
      return Boolean.valueOf(dbFlagVal);
    }

    /**
     * Gets the value of the parameter with the key
     * <code>MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC</code> from the given
     * table
     *
     * @return the Boolean value of the property with the key
     *     <code>MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC</code> if it is
     *     available else returns null
     */
    public static Boolean getHmsSyncProperty(
        org.apache.hadoop.hive.metastore.api.Table tbl) {
      if (!tbl.isSetParameters()) return null;
      String val =
          tbl.getParameters()
              .get(MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey());
      if (val == null || val.isEmpty()) return null;
      return Boolean.valueOf(val);
    }

    /**
     * Util method to create partition key-value map from HMS Partition objects.
     */
    protected static List<TPartitionKeyValue> getTPartitionSpecFromHmsPartition(
        org.apache.hadoop.hive.metastore.api.Table msTbl, Partition partition) {
      List<TPartitionKeyValue> tPartSpec = new ArrayList<>();
      List<org.apache.hadoop.hive.metastore.api.FieldSchema> fsList =
          msTbl.getPartitionKeys();
      List<String> partVals = partition.getValues();
      Preconditions.checkNotNull(partVals);
      Preconditions.checkState(fsList.size() == partVals.size());
      for (int i = 0; i < fsList.size(); i++) {
        tPartSpec.add(new TPartitionKeyValue(fsList.get(i).getName(), partVals.get(i)));
      }
      return tPartSpec;
    }

    /**
     * Util method to create a partition spec string out of a TPartitionKeyValue objects.
     */
    protected static String constructPartitionStringFromTPartitionSpec(
        List<TPartitionKeyValue> tPartSpec) {
      List<String> partitionCols = new ArrayList<>();
      List<String> partitionVals = new ArrayList<>();
      for (TPartitionKeyValue kv: tPartSpec) {
        partitionCols.add(kv.getName());
        partitionVals.add(kv.getValue());
      }
      String partString = FileUtils.makePartName(partitionCols, partitionVals);
      return partString;
    }

    /*
     * Helper function to initiate a table reload on Catalog. Re-throws the exception if
     * the catalog operation throws.
     */
    protected void reloadTableFromCatalog(String operation) throws CatalogException {
      if (!catalog_.reloadTableIfExists(dbName_, tblName_,
              "Processing " + operation + " event from HMS")) {
        debugLog("Automatic refresh on table {} failed as the table is not "
            + "present either in catalog or metastore.", getFullyQualifiedTblName());
      } else {
        infoLog("Table {} has been refreshed after " + operation +".",
            getFullyQualifiedTblName());
      }
    }
  }

  /**
   * Base class for all the database events
   */
  public static abstract class MetastoreDatabaseEvent extends MetastoreEvent {
    MetastoreDatabaseEvent(CatalogServiceCatalog catalogServiceCatalog, Metrics metrics,
        NotificationEvent event) {
      super(catalogServiceCatalog, metrics, event);
      Preconditions.checkNotNull(dbName_, debugString("Database name cannot be null"));
      debugLog("Creating event {} of type {} on database {}", eventId_,
              eventType_, dbName_);
    }

    /**
     * Even though there is a database level property
     * <code>MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC</code> it is only used
     * for tables within that
     * database. As such this property does not control if the database level DDLs are
     * skipped or not.
     *
     * @return false
     */
    @Override
    protected boolean isEventProcessingDisabled() {
      return false;
    }
  }

  /**
   * MetastoreEvent for CREATE_TABLE event type
   */
  public static class CreateTableEvent extends MetastoreTableEvent {
    /**
     * Prevent instantiation from outside should use MetastoreEventFactory instead
     */
    private CreateTableEvent(CatalogServiceCatalog catalog, Metrics metrics,
        NotificationEvent event) throws MetastoreNotificationException {
      super(catalog, metrics, event);
      Preconditions.checkArgument(MetastoreEventType.CREATE_TABLE.equals(eventType_));
      Preconditions
          .checkNotNull(event.getMessage(), debugString("Event message is null"));
      CreateTableMessage createTableMessage =
          MetastoreEventsProcessor.getMessageDeserializer()
              .getCreateTableMessage(event.getMessage());
      try {
        msTbl_ = createTableMessage.getTableObj();
      } catch (Exception e) {
        throw new MetastoreNotificationException(
            debugString("Unable to deserialize the event message"), e);
      }
    }

    /**
     * If the table provided in the catalog does not exist in the catalog, this method
     * will create it. If the table in the catalog already exists, it relies of the
     * creationTime of the Metastore Table to resolve the conflict. If the catalog table's
     * creation time is less than creationTime of the table from the event, it will be
     * overridden. Else, it will ignore the event
     */
    @Override
    public void process() throws MetastoreNotificationException {
      // check if the table exists already. This could happen in corner cases of the
      // table being dropped and recreated with the same name or in case this event is
      // a self-event (see description of self-event in the class documentation of
      // MetastoreEventsProcessor)
      try {
        if (!catalog_.addTableIfNotExists(dbName_, tblName_)) {
          debugLog(
              "Not adding the table {} since it already exists in catalog", tblName_);
          return;
        }
      } catch (CatalogException e) {
        // if a exception is thrown, it could be due to the fact that the db did not
        // exist in the catalog cache. This could only happen if the previous
        // create_database event for this table errored out
        throw new MetastoreNotificationException(debugString(
            "Unable to add table while processing for table %s because the "
                + "database doesn't exist. This could be due to a previous error while "
                + "processing CREATE_DATABASE event for the database %s",
            getFullyQualifiedTblName(), dbName_), e);
      }
      debugLog("Added a table {}", getFullyQualifiedTblName());
    }

    @Override
    public boolean isRemovedAfter(List<MetastoreEvent> events) {
      Preconditions.checkNotNull(events);
      for (MetastoreEvent event : events) {
        if (event.eventType_.equals(MetastoreEventType.DROP_TABLE)) {
          DropTableEvent dropTableEvent = (DropTableEvent) event;
          if (dbName_.equalsIgnoreCase(dropTableEvent.dbName_) && tblName_
              .equalsIgnoreCase(dropTableEvent.tblName_)) {
            infoLog("Found table {} is removed later in event {} type {}", tblName_,
                dropTableEvent.eventId_, dropTableEvent.eventType_);
            return true;
          }
        } else if (event.eventType_.equals(MetastoreEventType.ALTER_TABLE)) {
          // renames are implemented as a atomic (drop+create) so rename events can
          // also be treated as a inverse event of the create_table event. Consider a
          // DDL op sequence like create table, alter table rename from impala. Since
          // the rename operation is internally implemented as drop+add, processing a
          // create table event on this cluster will show up the table for small window
          // of time, until the actual rename event is processed. If however, we ignore
          // the create table event, the alter rename event just becomes a addIfNotExists
          // event which is valid for both a self-event and external event cases
          AlterTableEvent alterTableEvent = (AlterTableEvent) event;
          if (alterTableEvent.isRename_
              && dbName_.equalsIgnoreCase(alterTableEvent.msTbl_.getDbName())
              && tblName_.equalsIgnoreCase(alterTableEvent.msTbl_.getTableName())) {
            infoLog("Found table {} is renamed later in event {} type {}", tblName_,
                alterTableEvent.eventId_, alterTableEvent.eventType_);
            return true;
          }
        }
      }
      return false;
    }
  }

  /**
   *  Metastore event handler for INSERT events. Handles insert events at both table
   *  and partition scopes.
   */
  public static class InsertEvent extends MetastoreTableEvent {

    // Represents the partition for this insert. Null if the table is unpartitioned.
    private Partition insertPartition_;

    /**
     * Prevent instantiation from outside should use MetastoreEventFactory instead
     */
    @VisibleForTesting
    InsertEvent(CatalogServiceCatalog catalog, Metrics metrics,
        NotificationEvent event) throws MetastoreNotificationException {
      super(catalog, metrics, event);
      Preconditions.checkArgument(MetastoreEventType.INSERT.equals(eventType_));
      InsertMessage insertMessage =
          MetastoreEventsProcessor.getMessageDeserializer()
              .getInsertMessage(event.getMessage());
      try {
        msTbl_ = Preconditions.checkNotNull(insertMessage.getTableObj());
        insertPartition_ = insertMessage.getPtnObj();
      } catch (Exception e) {
        throw new MetastoreNotificationException(debugString("Unable to "
            + "parse insert message"), e);
      }
    }

    /**
     * Currently we do not check for self-events in Inserts. Existing self-events logic
     * cannot be used for insert events since firing insert event does not allow us to
     * modify table parameters in HMS. Hence, we cannot get CatalogServiceIdentifiers in
     * Insert Events.
     * TODO: Handle self-events for insert case.
     */
    @Override
    public void process() throws MetastoreNotificationException {
      // Reload the whole table if it's a transactional table.
      if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
        insertPartition_ = null;
      }

      if (insertPartition_ != null) {
        processPartitionInserts();
      } else {
        processTableInserts();
      }
    }

    /**
     * Process partition inserts
     */
    private void processPartitionInserts() throws MetastoreNotificationException {
      // For partitioned table, refresh the partition only.
      Preconditions.checkNotNull(insertPartition_);
      List<TPartitionKeyValue> tPartSpec = getTPartitionSpecFromHmsPartition(msTbl_,
          insertPartition_);
      try {
        // Ignore event if table or database is not in catalog. Throw exception if
        // refresh fails. If the partition does not exist in metastore the reload
        // method below removes it from the catalog
        if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
            "processing partition-level INSERT event from HMS")) {
          debugLog("Refresh of table {} partition {} after insert "
                  + "event failed as the table is not present in the catalog.",
              getFullyQualifiedTblName(), (tPartSpec));
        } else {
          infoLog("Table {} partition {} has been refreshed after insert.",
              getFullyQualifiedTblName(),
              constructPartitionStringFromTPartitionSpec(tPartSpec));
        }
      } catch (DatabaseNotFoundException e) {
        debugLog("Refresh of table {} partition {} for insert "
                + "event failed as the database is not present in the catalog.",
            getFullyQualifiedTblName(),
            constructPartitionStringFromTPartitionSpec(tPartSpec));
      } catch (CatalogException e) {
        throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
                + "partition on table {} partition {} failed. Event processing cannot "
                + "continue. Issue and invalidate command to reset the event processor "
                + "state.", getFullyQualifiedTblName(),
            constructPartitionStringFromTPartitionSpec(tPartSpec)), e);
      }
    }

    /**
     *  Process unpartitioned table inserts
     */
    private void processTableInserts() throws MetastoreNotificationException {
      // For non-partitioned tables, refresh the whole table.
      Preconditions.checkArgument(insertPartition_ == null);
      try {
        // Ignore event if table or database is not in the catalog. Throw exception if
        // refresh fails.
        reloadTableFromCatalog("table-level INSERT");
      } catch (DatabaseNotFoundException e) {
        debugLog("Automatic refresh of table {} insert failed as the "
            + "database is not present in the catalog.", getFullyQualifiedTblName());
      } catch (CatalogException e) {
        if (e instanceof TableLoadingException &&
            e.getCause() instanceof NoSuchObjectException) {
          LOG.warn(
              "Ignoring the refresh of the table since the table does"
                  + " not exist in metastore anymore");
        } else {
          throw new MetastoreNotificationNeedsInvalidateException(
              debugString("Refresh table {} failed. Event processing "
                  + "cannot continue. Issue an invalidate metadata command to reset "
                  + "the event processor state.", getFullyQualifiedTblName()), e);
        }
      }
    }
  }

  /**
   * MetastoreEvent for ALTER_TABLE event type
   */
  public static class AlterTableEvent extends TableInvalidatingEvent {
    protected org.apache.hadoop.hive.metastore.api.Table tableBefore_;
    // the table object after alter operation, as parsed from the NotificationEvent
    protected org.apache.hadoop.hive.metastore.api.Table tableAfter_;
    // true if this alter event was due to a rename operation
    private final boolean isRename_;
    // value of event sync flag for this table before the alter operation
    private final Boolean eventSyncBeforeFlag_;
    // value of the event sync flag if available at this table after the alter operation
    private final Boolean eventSyncAfterFlag_;
    // value of the db flag at the time of event creation
    private final boolean dbFlagVal;

    /**
     * Prevent instantiation from outside should use MetastoreEventFactory instead
     */
    @VisibleForTesting
    AlterTableEvent(CatalogServiceCatalog catalog, Metrics metrics,
        NotificationEvent event) throws MetastoreNotificationException {
      super(catalog, metrics, event);
      Preconditions.checkArgument(MetastoreEventType.ALTER_TABLE.equals(eventType_));
      JSONAlterTableMessage alterTableMessage =
          (JSONAlterTableMessage) MetastoreEventsProcessor.getMessageDeserializer()
              .getAlterTableMessage(event.getMessage());
      try {
        msTbl_ = Preconditions.checkNotNull(alterTableMessage.getTableObjBefore());
        tableAfter_ = Preconditions.checkNotNull(alterTableMessage.getTableObjAfter());
        tableBefore_ = Preconditions.checkNotNull(alterTableMessage.getTableObjBefore());
      } catch (Exception e) {
        throw new MetastoreNotificationException(
            debugString("Unable to parse the alter table message"), e);
      }
      // this is a rename event if either dbName or tblName of before and after object
      // changed
      isRename_ = !msTbl_.getDbName().equalsIgnoreCase(tableAfter_.getDbName())
          || !msTbl_.getTableName().equalsIgnoreCase(tableAfter_.getTableName());
      eventSyncBeforeFlag_ = getHmsSyncProperty(msTbl_);
      eventSyncAfterFlag_ = getHmsSyncProperty(tableAfter_);
      dbFlagVal =
          Boolean.valueOf(catalog_.getDbProperty(dbName_,
              MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey()));
    }

    @Override
    protected void initSelfEventIdentifiersFromEvent() {
      versionNumberFromEvent_ = Long.parseLong(
          getStringProperty(tableAfter_.getParameters(),
              MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1"));
      serviceIdFromEvent_ =
          getStringProperty(tableAfter_.getParameters(),
              MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
      try {
        if (isRename_) {
          //if this is rename event then identifiers will be in tableAfter
          pendingVersionNumbersFromCatalog_ = catalog_
              .getInFlightVersionsForEvents(tableAfter_.getDbName(),
                  tableAfter_.getTableName());
        } else {
          pendingVersionNumbersFromCatalog_ = catalog_
              .getInFlightVersionsForEvents(msTbl_.getDbName(), msTbl_.getTableName());
        }
      } catch (TableNotFoundException | DatabaseNotFoundException e) {
        debugLog("Received exception {}. Ignoring self-event evaluation",
            e.getMessage());
      }
    }

    /**
     * If the ALTER_TABLE event is due a table rename, this method removes the old table
     * and creates a new table with the new name. Else, this just issues a invalidate
     * table on the tblName from the event
     */
    @Override
    public void process() throws MetastoreNotificationException, CatalogException {
      // Determine whether this is an event which we have already seen or if it is a new
      // event
      if (isSelfEvent()) {
        infoLog("Not processing the event as it is a self-event or "
            + "update is already present in catalog.");
        return;
      }
      // Ignore the event if this is a trivial event. See javadoc for
      // canBeSkipped() for examples.
      if (canBeSkipped()) {
        infoLog("Not processing this event as it only modifies some table parameters "
            + "which can be ignored.");
        return;
      }
      // in case of table level alters from external systems it is better to do a full
      // invalidate  eg. this could be due to as simple as adding a new parameter or a
      // full blown adding  or changing column type
      // detect the special where a table is renamed
      if (!isRename_) {
        // table is not renamed, need to invalidate
        if (!invalidateCatalogTable()) {
          if (wasEventSyncTurnedOn()) {
            // we received this alter table event on a non-existing table. We also
            // detect that event sync was turned on in this event. This may mean that
            // the table creation was skipped earlier because event sync was turned off
            // we don't really know how many of events we have skipped till now because
            // the sync was disabled all this while before we receive such a event. We
            // error on the side of caution by stopping the event processing and
            // letting the user to issue a invalidate metadata to reset the state
            throw new MetastoreNotificationNeedsInvalidateException(debugString(
                "Detected that event sync was turned on for the table %s "
                    + "and the table does not exist. Event processing cannot be "
                    + "continued further. Issue a invalidate metadata command to reset "
                    + "the event processing state", getFullyQualifiedTblName()));
          }
          debugLog("Table {} does not need to be "
                  + "invalidated since it does not exist anymore",
              getFullyQualifiedTblName());
        } else {
          infoLog("Table {} is invalidated", getFullyQualifiedTblName());
        }
        return;
      }
      // table was renamed, remove the old table
      infoLog("Found that {} table was renamed. Renaming it by "
              + "remove and adding a new table",
          new TableName(msTbl_.getDbName(), msTbl_.getTableName()));
      TTableName oldTTableName =
          new TTableName(msTbl_.getDbName(), msTbl_.getTableName());
      TTableName newTTableName =
          new TTableName(tableAfter_.getDbName(), tableAfter_.getTableName());

      // Table is renamed if old db and table exist in catalog. If the rename is to a
      // different database, we check if this other database exists in catalog. If
      // either the old table, old database or new database are not in catalog, we skip
      // this event.
      if (!catalog_.renameTableIfExists(oldTTableName, newTTableName)) {
        debugLog("Did not remove old table to rename table {} to {} since "
            + "it does not exist anymore or either the old database or the new "
            + "database don't exist anymore.", qualify(oldTTableName),
            qualify(newTTableName));
      } else {
        infoLog("Renamed old table {} to new table {}.", qualify(oldTTableName),
            qualify(newTTableName));
      }
    }

    /**
     * Detects a event sync flag was turned on in this event
     */
    private boolean wasEventSyncTurnedOn() {
      // the eventsync flag was not changed
      if (Objects.equals(eventSyncBeforeFlag_, eventSyncAfterFlag_)) return false;
      // eventSync after flag is null or if it is explicitly set to false
      if ((eventSyncAfterFlag_ == null && !dbFlagVal) || !eventSyncAfterFlag_) {
        return true;
      }
      return false;
    }

    @Override
    protected boolean canBeSkipped() {
      // Certain alter events just modify some parameters such as
      // "transient_lastDdlTime" in Hive. For eg: the alter table event generated
      // along with insert events. Check if the alter table event is such a trivial
      // event by setting those parameters equal before and after the event and
      // comparing the objects.

      // Avoid modifying the object from event.
      org.apache.hadoop.hive.metastore.api.Table tblAfter = tableAfter_.deepCopy();
      setTrivialParameters(tableBefore_.getParameters(), tblAfter.getParameters());
      return tblAfter.equals(tableBefore_);
    }

    private String qualify(TTableName tTableName) {
      return new TableName(tTableName.db_name, tTableName.table_name).toString();
    }

    /**
     * In case of alter table events, it is possible that the alter event is generated
     * because user changed the value of the parameter
     * <code>MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC</code>. If the
     * parameter is unchanged, it doesn't
     * matter if you use the before or after table object here since the eventual action
     * is going be invalidate or rename. If however, the parameter is changed, couple of
     * things could happen. The flag changes from unset/false to true or it changes from
     * true to false/unset. In the first case, we want to process the event (and ignore
     * subsequent events on this table). In the second case, we should process the event
     * (as well as all the subsequent events on the table). So we always process this
     * event when the value of the flag is changed.
     *
     * @return true, if this event needs to be skipped. false if this event needs to be
     *     processed.
     */
    @Override
    protected boolean isEventProcessingDisabled() {
      // if the event sync flag was changed then we always process this event
      if (!Objects.equals(eventSyncBeforeFlag_, eventSyncAfterFlag_)) {
        infoLog("Before flag value {} after flag value {} changed for table {}",
            eventSyncBeforeFlag_, eventSyncAfterFlag_, getFullyQualifiedTblName());
        return false;
      }
      // flag is unchanged, use the default impl from base class
      return super.isEventProcessingDisabled();
    }
  }

  /**
   * MetastoreEvent for the DROP_TABLE event type
   */
  public static class DropTableEvent extends MetastoreTableEvent {

    /**
     * Prevent instantiation from outside should use MetastoreEventFactory instead
     */
    private DropTableEvent(CatalogServiceCatalog catalog, Metrics metrics,
        NotificationEvent event) throws MetastoreNotificationException {
      super(catalog, metrics, event);
      Preconditions.checkArgument(MetastoreEventType.DROP_TABLE.equals(eventType_));
      JSONDropTableMessage dropTableMessage =
          (JSONDropTableMessage) MetastoreEventsProcessor.getMessageDeserializer()
              .getDropTableMessage(event.getMessage());
      try {
        msTbl_ = Preconditions.checkNotNull(dropTableMessage.getTableObj());
      } catch (Exception e) {
        throw new MetastoreNotificationException(debugString(
            "Could not parse event message. "
                + "Check if %s is set to true in metastore configuration",
            MetastoreEventsProcessor.HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY), e);
      }
    }

    /**
     * Process the drop table event type. If the table from the event doesn't exist in the
     * catalog, ignore the event. If the table exists in the catalog, compares the
     * createTime of the table in catalog with the createTime of the table from the event
     * and remove the catalog table if there is a match. If the catalog table is a
     * incomplete table it is removed as well. The creation_time from HMS is unfortunately
     * in seconds granularity, which means there is a limitation that we cannot
     * distinguish between tables which are created with the same name within a second.
     * So a sequence of create_table, drop_table, create_table happening within the
     * same second might cause false positives on drop_table event processing. This is
     * not a huge problem since the tables will eventually be created when the
     * create events are processed but there will be a non-zero amount of time when the
     * table will not be existing in catalog.
     * TODO : Once HIVE-21595 is available we should rely on table_id for determining a
     * newer incarnation of a previous table.
     */
    @Override
    public void process() {
      Reference<Boolean> tblWasFound = new Reference<>();
      Reference<Boolean> tblMatched = new Reference<>();
      Table removedTable = catalog_.removeTableIfExists(msTbl_, tblWasFound, tblMatched);
      if (removedTable != null) {
        infoLog("Removed table {} ", getFullyQualifiedTblName());
      } else if (!tblWasFound.getRef()) {
        debugLog("Table {} was not removed since it did not exist in catalog.",
            tblName_);
      } else if (!tblMatched.getRef()) {
        infoLog(debugString("Table %s was not removed from "
            + "catalog since the creation time of the table did not match", tblName_));
        metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
      }
    }
  }

  /**
   * MetastoreEvent for CREATE_DATABASE event type
   */
  public static class CreateDatabaseEvent extends MetastoreDatabaseEvent {

    // metastore database object as parsed from NotificationEvent message
    private final Database createdDatabase_;

    /**
     * Prevent instantiation from outside should use MetastoreEventFactory instead
     */
    private CreateDatabaseEvent(CatalogServiceCatalog catalog, Metrics metrics,
        NotificationEvent event) throws MetastoreNotificationException {
      super(catalog, metrics, event);
      Preconditions.checkArgument(MetastoreEventType.CREATE_DATABASE.equals(eventType_));
      JSONCreateDatabaseMessage createDatabaseMessage =
          (JSONCreateDatabaseMessage) MetastoreEventsProcessor.getMessageDeserializer()
              .getCreateDatabaseMessage(event.getMessage());
      try {
        createdDatabase_ =
            Preconditions.checkNotNull(createDatabaseMessage.getDatabaseObject());
      } catch (Exception e) {
        throw new MetastoreNotificationException(debugString(
            "Database object is null in the event. "
                + "This could be a metastore configuration problem. "
                + "Check if %s is set to true in metastore configuration",
            MetastoreEventsProcessor.HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY), e);
      }
    }

    /**
     * Processes the create database event by adding the Db object from the event if it
     * does not exist in the catalog already.
     */
    @Override
    public void process() {
      // if the database already exists in catalog, by definition, it is a later version
      // of the database since metastore will not allow it be created if it was already
      // existing at the time of creation. In such case, it is safe to assume that the
      // already existing database in catalog is a later version with the same name and
      // this event can be ignored
      if (catalog_.addDbIfNotExists(dbName_, createdDatabase_)) {
        infoLog("Successfully added database {}", dbName_);
      } else {
        infoLog("Database {} already exists", dbName_);
      }
    }

    @Override
    public boolean isRemovedAfter(List<MetastoreEvent> events) {
      Preconditions.checkNotNull(events);
      for (MetastoreEvent event : events) {
        if (event.eventType_.equals(MetastoreEventType.DROP_DATABASE)) {
          DropDatabaseEvent dropDatabaseEvent = (DropDatabaseEvent) event;
          if (dbName_.equalsIgnoreCase(dropDatabaseEvent.dbName_)) {
            infoLog("Found database {} is removed later in event {} of type {} ", dbName_,
                dropDatabaseEvent.eventId_, dropDatabaseEvent.eventType_);
            return true;
          }
        }
      }
      return false;
    }
  }

  /**
   * MetastoreEvent for ALTER_DATABASE event type
   */
  public static class AlterDatabaseEvent extends MetastoreDatabaseEvent {
    // metastore database object as parsed from NotificationEvent message
    private final Database alteredDatabase_;

    /**
     * Prevent instantiation from outside should use MetastoreEventFactory instead
     */
    private AlterDatabaseEvent(CatalogServiceCatalog catalog, Metrics metrics,
        NotificationEvent event) throws MetastoreNotificationException {
      super(catalog, metrics, event);
      Preconditions.checkArgument(MetastoreEventType.ALTER_DATABASE.equals(eventType_));
      JSONAlterDatabaseMessage alterDatabaseMessage =
          (JSONAlterDatabaseMessage) MetastoreEventsProcessor.getMessageDeserializer()
              .getAlterDatabaseMessage(event.getMessage());
      try {
        alteredDatabase_ =
            Preconditions.checkNotNull(alterDatabaseMessage.getDbObjAfter());
      } catch (Exception e) {
        throw new MetastoreNotificationException(
            debugString("Unable to parse the alter database message"), e);
      }
    }

    /**
     * Processes the alter database event by replacing the catalog cached Db object with
     * the Db object from the event
     */
    @Override
    public void process() throws CatalogException, MetastoreNotificationException {
      if (isSelfEvent()) {
        infoLog("Not processing the event as it is a self-event");
        return;
      }
      Preconditions.checkNotNull(alteredDatabase_);
      // If not self event, copy Db object from event to catalog
      if (!catalog_.updateDbIfExists(alteredDatabase_)) {
        // Okay to skip this event. Events processor will not error out.
        debugLog("Update database {} failed as the database is not present in the "
            + "catalog.", alteredDatabase_.getName());
      } else {
        infoLog("Database {} updated after alter database event.",
            alteredDatabase_.getName());
      }
    }

    @Override
    protected void initSelfEventIdentifiersFromEvent() {
      versionNumberFromEvent_ = Long.parseLong(getStringProperty(
          alteredDatabase_.getParameters(),
          MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1"));
      serviceIdFromEvent_ = getStringProperty(
          alteredDatabase_.getParameters(),
          MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
      try {
        pendingVersionNumbersFromCatalog_ =
            catalog_.getInFlightVersionsForEvents(dbName_, tblName_);
      } catch (DatabaseNotFoundException | TableNotFoundException e) {
        // ok to ignore this exception, since if the db doesn't exit, this event needs
        // to be ignored anyways
        debugLog("Received exception {}. Ignoring self-event evaluation", e.getMessage());
      }
    }
  }

  /**
   * MetastoreEvent for the DROP_DATABASE event
   */
  public static class DropDatabaseEvent extends MetastoreDatabaseEvent {

    // Metastore database object as parsed from NotificationEvent message
    private final Database droppedDatabase_;

    /**
     * Prevent instantiation from outside should use MetastoreEventFactory instead
     */
    private DropDatabaseEvent(
        CatalogServiceCatalog catalog, Metrics metrics, NotificationEvent event)
        throws MetastoreNotificationException {
      super(catalog, metrics, event);
      Preconditions.checkArgument(MetastoreEventType.DROP_DATABASE.equals(eventType_));
      JSONDropDatabaseMessage dropDatabaseMessage =
          (JSONDropDatabaseMessage) MetastoreEventsProcessor.getMessageDeserializer()
              .getDropDatabaseMessage(event.getMessage());
      try {
        droppedDatabase_ =
            Preconditions.checkNotNull(dropDatabaseMessage.getDatabaseObject());
      } catch (Exception e) {
        throw new MetastoreNotificationException(debugString(
            "Database object is null in the event. "
                + "This could be a metastore configuration problem. "
                + "Check if %s is set to true in metastore configuration",
            MetastoreEventsProcessor.HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY), e);
      }
    }

    /**
     * Process the drop database event. This handler removes the db object from catalog
     * only if the CREATION_TIME of the catalog's database object is lesser than or equal
     * to that of the database object present in the notification event. If the
     * CREATION_TIME of the catalog's DB object is greater than that of the notification
     * event's DB object, it means that the Database object present in the catalog is a
     * later version and we can skip the event. (For instance, when user does a create db,
     * drop db and create db again with the same dbName.).
     * The creation_time from HMS is unfortunately in seconds granularity, which means
     * there is a limitation that we cannot distinguish between databases which are
     * created with the same name within a second. So a sequence of create_database,
     * drop_database, create_database happening within the same second might cause
     * false positives on drop_database event processing. This is not a huge problem
     * since the databases will eventually be created when the create events are
     * processed but there will be a non-zero amount of time when the database will not
     * be existing in catalog.
     * TODO : Once HIVE-21595 is available we should rely on database_id for determining a
     * newer incarnation of a previous database.
     */
    @Override
    public void process() {
      Reference<Boolean> dbFound = new Reference<>();
      Reference<Boolean> dbMatched = new Reference<>();
      Db removedDb = catalog_.removeDbIfExists(droppedDatabase_, dbFound, dbMatched);
      if (removedDb != null) {
        infoLog("Removed Database {} ", dbName_);
      } else if (!dbFound.getRef()) {
        debugLog("Database {} was not removed since it " +
            "did not exist in catalog.", dbName_);
      } else if (!dbMatched.getRef()) {
        infoLog(debugString("Database %s was not removed from catalog since "
            + "the creation time of the Database did not match", dbName_));
        metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
      }
    }
  }

  /**
   * MetastoreEvent for which issues invalidate on a table from the event
   */
  public static abstract class TableInvalidatingEvent extends MetastoreTableEvent {
    /**
     * Prevent instantiation from outside should use MetastoreEventFactory instead
     */
    private TableInvalidatingEvent(
        CatalogServiceCatalog catalog, Metrics metrics, NotificationEvent event) {
      super(catalog, metrics, event);
    }

    /**
     * Issues a invalidate table on the catalog on the table from the event. This
     * invalidate does not fetch information from metastore unlike the invalidate metadata
     * command since event is triggered post-metastore activity. This handler invalidates
     * by atomically removing existing loaded table and replacing it with a
     * IncompleteTable. If the table doesn't exist in catalog this operation is a no-op
     */
    @Override
    public void process() throws MetastoreNotificationException, CatalogException {
      // skip event processing in case its a self-event
      if (isSelfEvent()) {
        infoLog("Not processing the event as it is a self-event");
        return;
      }

      // Skip if it's only a change in parameters by Hive, which can be ignored.
      if (canBeSkipped()) {
        infoLog("Not processing this event as it only modifies some "
            + "parameters which can be ignored.");
        return;
      }

      if (invalidateCatalogTable()) {
        infoLog("Table {} is invalidated", getFullyQualifiedTblName());
      } else {
        debugLog("Table {} does not need to be invalidated since "
            + "it does not exist anymore", getFullyQualifiedTblName());
      }
    }

    protected static String getStringProperty(
        Map<String, String> params, String key, String defaultVal) {
      if (params == null) return defaultVal;
      return params.getOrDefault(key, defaultVal);
    }

    /**
     * Returns a list of parameters that are set by Hive for tables/partitions that can be
     * ignored to determine if the alter table/partition event is a trivial one.
     */
    static final List<String> parametersToIgnore = new ImmutableList.Builder<String>()
        .add("transient_lastDdlTime")
        .add("totalSize")
        .add("numFilesErasureCoded")
        .add("numFiles")
        .build();

    /**
     * Util method that sets the parameters that can be ignored equal before and after
     * event.
     */
     static void setTrivialParameters(Map<String, String> parametersBefore,
        Map<String, String> parametersAfter) {
      for (String parameter: parametersToIgnore) {
        String val = parametersBefore.get(parameter);
        if (val == null) {
          parametersAfter.remove(parameter);
        } else {
          parametersAfter.put(parameter, val);
        }
      }
    }

    /**
     * Hive generates certain trivial alter events for eg: change only
     * "transient_lastDdlTime". This method returns true if the alter partition event is
     * such a trivial event.
     */
    protected abstract boolean canBeSkipped();
  }

  public static class AddPartitionEvent extends TableInvalidatingEvent {
    private Partition lastAddedPartition_;
    private final List<Partition> addedPartitions_;

    /**
     * Prevent instantiation from outside should use MetastoreEventFactory instead
     */
    private AddPartitionEvent(CatalogServiceCatalog catalog, Metrics metrics,
        NotificationEvent event) throws MetastoreNotificationException {
      super(catalog, metrics, event);
      Preconditions.checkState(eventType_.equals(MetastoreEventType.ADD_PARTITION));
      if (event.getMessage() == null) {
        throw new IllegalStateException(debugString("Event message is null"));
      }
      try {
        AddPartitionMessage addPartitionMessage_ =
            MetastoreEventsProcessor.getMessageDeserializer()
                .getAddPartitionMessage(event.getMessage());
        addedPartitions_ =
            Lists.newArrayList(addPartitionMessage_.getPartitionObjs());
        // it is possible that the added partitions is empty in certain cases. See
        // IMPALA-8847 for example
        if (!addedPartitions_.isEmpty()) {
          // when multiple partitions are added in HMS they are all added as one
          // transaction Hence all the partitions which are present in the message must
          // have the same serviceId and version if it is set. hence it is fine to just
          // look at the last added partition in the list and use it for the self-event
          // ids
          lastAddedPartition_ = addedPartitions_.get(addedPartitions_.size() - 1);
        }
        msTbl_ = addPartitionMessage_.getTableObj();
      } catch (Exception ex) {
        throw new MetastoreNotificationException(ex);
      }
    }

    @Override
    public void process() throws MetastoreNotificationException, CatalogException {
      // bail out early if there are not partitions to process
      if (addedPartitions_.isEmpty()) {
        infoLog("Partition list is empty. Ignoring this event.");
        return;
      }
      if (isSelfEvent()) {
        infoLog("Not processing the event as it is a self-event");
        return;
      }
      try {
        // Reload the whole table if it's a transactional table.
        if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
          reloadTableFromCatalog("ADD_PARTITION");
        } else {
          boolean success = true;
          // HMS adds partitions in a transactional way. This means there may be multiple
          // HMS partition objects in an add_partition event. We try to do the same here
          // by refreshing all those partitions in a loop. If any partition refresh fails,
          // we throw MetastoreNotificationNeedsInvalidateException exception. We skip
          // refresh of the partitions if the table is not present in the catalog.
          infoLog("Trying to refresh {} partitions added to table {} in the event",
              addedPartitions_.size(), getFullyQualifiedTblName());
          for (Partition partition : addedPartitions_) {
            List<TPartitionKeyValue> tPartSpec =
                getTPartitionSpecFromHmsPartition(msTbl_, partition);
            if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
                "processing ADD_PARTITION event from HMS")) {
              debugLog("Refresh partitions on table {} failed as table was not present " +
                  "in the catalog.", getFullyQualifiedTblName());
              success = false;
              break;
            }
          }
          if (success) {
            infoLog("Refreshed {} partitions of table {}", addedPartitions_.size(),
                getFullyQualifiedTblName());
          }
        }
      } catch (DatabaseNotFoundException e) {
        debugLog("Refresh partitions on table {} after add_partitions event failed as "
                + "the database was not present in the catalog.",
            getFullyQualifiedTblName());
      } catch (CatalogException e) {
        throw new MetastoreNotificationNeedsInvalidateException(debugString("Failed to "
                + "refresh newly added partitions of table {}. Event processing cannot "
                + "continue. Issue an invalidate command to reset event processor.",
            getFullyQualifiedTblName()), e);
      }
    }

    @Override
    protected void initSelfEventIdentifiersFromEvent() {
      versionNumberFromEvent_ = Long.parseLong(getStringProperty(
          lastAddedPartition_.getParameters(),
          MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1"));
      serviceIdFromEvent_ = getStringProperty(
          lastAddedPartition_.getParameters(),
          MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
      try {
        pendingVersionNumbersFromCatalog_ =
            catalog_.getInFlightVersionsForEvents(dbName_, tblName_);
      } catch (DatabaseNotFoundException | TableNotFoundException e) {
        // ok to ignore this exception, since if the db doesn't exit, this event needs
        // to be ignored anyways
        debugLog("Received exception {}. Ignoring self-event evaluation",
            e.getMessage());
      }
    }

    @Override
    protected boolean canBeSkipped() { return false; }
  }

  public static class AlterPartitionEvent extends TableInvalidatingEvent {
    // the Partition object before alter operation, as parsed from the NotificationEvent
    private final org.apache.hadoop.hive.metastore.api.Partition partitionBefore_;
    // the Partition object after alter operation, as parsed from the NotificationEvent
    private final org.apache.hadoop.hive.metastore.api.Partition partitionAfter_;

    /**
     * Prevent instantiation from outside should use MetastoreEventFactory instead
     */
    private AlterPartitionEvent(CatalogServiceCatalog catalog, Metrics metrics,
        NotificationEvent event) throws MetastoreNotificationException {
      super(catalog, metrics, event);
      Preconditions.checkState(eventType_.equals(MetastoreEventType.ALTER_PARTITION));
      Preconditions.checkNotNull(event.getMessage());
      AlterPartitionMessage alterPartitionMessage =
          MetastoreEventsProcessor.getMessageDeserializer()
              .getAlterPartitionMessage(event.getMessage());

      try {
        partitionBefore_ =
            Preconditions.checkNotNull(alterPartitionMessage.getPtnObjBefore());
        partitionAfter_ =
            Preconditions.checkNotNull(alterPartitionMessage.getPtnObjAfter());
        msTbl_ = alterPartitionMessage.getTableObj();
      } catch (Exception e) {
        throw new MetastoreNotificationException(
            debugString("Unable to parse the alter partition message"), e);
      }
    }

    @Override
    public void process() throws MetastoreNotificationException, CatalogException {
      if (isSelfEvent()) {
        infoLog("Not processing the event as it is a self-event");
        return;
      }

      // Ignore the event if this is a trivial event. See javadoc for
      // isTrivialAlterPartitionEvent() for examples.
      if (canBeSkipped()) {
        infoLog("Not processing this event as it only modifies some partition "
            + "parameters which can be ignored.");
        return;
      }

      // Reload the whole table if it's a transactional table.
      if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
        reloadTableFromCatalog("ALTER_PARTITION");
      } else {
        // Refresh the partition that was altered.
        Preconditions.checkNotNull(partitionAfter_);
        List<TPartitionKeyValue> tPartSpec = getTPartitionSpecFromHmsPartition(msTbl_,
            partitionAfter_);
        try {
          // Ignore event if table or database is not in catalog. Throw exception if
          // refresh fails.
          if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
              "processing ALTER_PARTITION event from HMS")) {
            debugLog("Refresh of table {} partition {} failed as the table "
                    + "is not present in the catalog.", getFullyQualifiedTblName(),
                constructPartitionStringFromTPartitionSpec(tPartSpec));
          } else {
            infoLog("Table {} partition {} has been refreshed",
                getFullyQualifiedTblName(),
                constructPartitionStringFromTPartitionSpec(tPartSpec));
          }
        } catch (DatabaseNotFoundException e) {
          debugLog("Refresh of table {} partition {} "
                  + "event failed as the database is not present in the catalog.",
              getFullyQualifiedTblName(),
              constructPartitionStringFromTPartitionSpec(tPartSpec));
        } catch (CatalogException e) {
          throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
                  + "partition on table {} partition {} failed. Event processing cannot "
                  + "continue. Issue and invalidate command to reset the event processor "
                  + "state.", getFullyQualifiedTblName(),
              constructPartitionStringFromTPartitionSpec(tPartSpec)), e);
        }
      }
    }

    @Override
    protected boolean canBeSkipped() {
      // Certain alter events just modify some parameters such as
      // "transient_lastDdlTime" in Hive. For eg: the alter table event generated
      // along with insert events. Check if the alter table event is such a trivial
      // event by setting those parameters equal before and after the event and
      // comparing the objects.

      // Avoid modifying the object from event.
      Partition afterPartition = partitionAfter_.deepCopy();
      setTrivialParameters(partitionBefore_.getParameters(),
          afterPartition.getParameters());
      return afterPartition.equals(partitionBefore_);
    }

    @Override
    protected void initSelfEventIdentifiersFromEvent() {
      versionNumberFromEvent_ = Long.parseLong(getStringProperty(
          partitionAfter_.getParameters(),
          MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1"));
      serviceIdFromEvent_ = getStringProperty(
          partitionAfter_.getParameters(),
          MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
      try {
        pendingVersionNumbersFromCatalog_ =
            catalog_.getInFlightVersionsForEvents(dbName_, tblName_);
      } catch (DatabaseNotFoundException | TableNotFoundException e) {
        // ok to ignore since if the db or table doesn't exist the event is ignored
        // anyways
        debugLog("Received exception {}. Ignoring self-event evaluation",
            e.getMessage());
      }
    }
  }

  public static class DropPartitionEvent extends TableInvalidatingEvent {
    private final List<Map<String, String>> droppedPartitions_;

    /**
     * Prevent instantiation from outside should use MetastoreEventFactory instead
     */
    private DropPartitionEvent(CatalogServiceCatalog catalog, Metrics metrics,
        NotificationEvent event) throws MetastoreNotificationException {
      super(catalog, metrics, event);
      Preconditions.checkState(eventType_.equals(MetastoreEventType.DROP_PARTITION));
      Preconditions.checkNotNull(event.getMessage());
      DropPartitionMessage dropPartitionMessage =
          MetastoreEventsProcessor.getMessageDeserializer()
              .getDropPartitionMessage(event.getMessage());
      try {
        msTbl_ = Preconditions.checkNotNull(dropPartitionMessage.getTableObj());
        droppedPartitions_ = dropPartitionMessage.getPartitions();
        Preconditions.checkNotNull(droppedPartitions_);
      } catch (Exception ex) {
        throw new MetastoreNotificationException(
            debugString("Could not parse event message. "
                    + "Check if %s is set to true in metastore configuration",
                MetastoreEventsProcessor.HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY),
            ex);
      }
    }

    @Override
    public void process() throws MetastoreNotificationException {
      // we have seen cases where a add_partition event is generated with empty
      // partition list (see IMPALA-8547 for details. Make sure that droppedPartitions
      // list is not empty
      if (droppedPartitions_.isEmpty()) {
        infoLog("Partition list is empty. Ignoring this event.");
      }
      // We do not need self event as dropPartition() call is a no-op if the directory
      // doesn't exist.
      try {
        // Reload the whole table if it's a transactional table.
        if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
          reloadTableFromCatalog("DROP_PARTITION");
        } else {
          boolean success = true;
          // We refresh all the partitions that were dropped from HMS. If a refresh
          // fails, we throw a MetastoreNotificationNeedsInvalidateException
          infoLog("{} partitions dropped from table {}. Trying "
              + "to refresh.", droppedPartitions_.size(), getFullyQualifiedTblName());
          for (Map<String, String> partSpec : droppedPartitions_) {
            List<TPartitionKeyValue> tPartSpec = new ArrayList<>(partSpec.size());
            for (Map.Entry<String, String> entry : partSpec.entrySet()) {
              tPartSpec.add(new TPartitionKeyValue(entry.getKey(), entry.getValue()));
            }
            if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
                "processing DROP_PARTITION event from HMS")) {
              debugLog("Could not refresh partition {} of table {} as table "
                      + "was not present in the catalog.",
                      getFullyQualifiedTblName());
              success = false;
              break;
            }
          }
          if (success) {
            infoLog("Refreshed {} partitions of table {}", droppedPartitions_.size(),
                getFullyQualifiedTblName());
          }
        }
      } catch (DatabaseNotFoundException e) {
        debugLog("Could not refresh partitions of table {}"
            + "as database was not present in the catalog.", getFullyQualifiedTblName());
      } catch (CatalogException e) {
        throw new MetastoreNotificationNeedsInvalidateException(debugString("Failed to "
            + "drop some partitions from table {} after a drop partitions event. Event "
                + "processing cannot continue. Issue an invalidate command to reset "
            + "event processor state.", getFullyQualifiedTblName()), e);
      }
    }

    @Override
    protected void initSelfEventIdentifiersFromEvent() {
      // no-op
    }

    @Override
    protected boolean canBeSkipped() { return false; }
  }

  /**
   * An event type which is ignored. Useful for unsupported metastore event types
   */
  public static class IgnoredEvent extends MetastoreEvent {

    /**
     * Prevent instantiation from outside should use MetastoreEventFactory instead
     */
    private IgnoredEvent(
        CatalogServiceCatalog catalog, Metrics metrics, NotificationEvent event) {
      super(catalog, metrics, event);
    }

    @Override
    public void process() {
      debugLog("Ignored");
    }

    @Override
    protected boolean isEventProcessingDisabled() {
      return false;
    }
  }
}
