// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.impala.catalog.events;


import com.codahale.metrics.Gauge;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.catalog.events.ConfigValidator.ValidationResult;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
import org.apache.impala.common.Metrics;
import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.thrift.TEventProcessorMetrics;
import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
import org.apache.impala.util.MetaStoreUtil;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A metastore event is a instance of the class
 * <code>org.apache.hadoop.hive.metastore.api.NotificationEvent</code>. Metastore can be
 * configured, to work with Listeners which are called on various DDL operations like
 * create/alter/drop operations on database, table, partition etc. Each event has a unique
 * incremental id and the generated events are be fetched from Metastore to get
 * incremental updates to the metadata stored in Hive metastore using the the public API
 * <code>get_next_notification</code> These events could be generated by external
 * Metastore clients like Apache Hive or Apache Spark as well as other Impala clusters
 * configured to talk with the same metastore.
 *
 * This class is used to poll metastore for such events at a given frequency. By observing
 * such events, we can take appropriate action on the catalogD
 * (refresh/invalidate/add/remove) so that catalog represents the latest information
 * available in metastore. We keep track of the last synced event id in each polling
 * iteration so the next batch can be requested appropriately. The current batch size is
 * constant and set to MAX_EVENTS_PER_RPC.
 *
 * <pre>
 *      +---------------+   +----------------+        +--------------+
 *      |Catalog state  |   |Catalog         |        |              |
 *      |stale          |   |State up-to-date|        |Catalog state |
 *      |(apply event)  |   |(ignore)        |        |is newer than |
 *      |               |   |                |        |event         |
 *      |               |   |                |        |(ignore)      |
 *      +------+--------+   +-----+----------+        +-+------------+
 *             |                  |                     |
 *             |                  |                     |
 *             |                  |                     |
 *             |                  |                     |
 *             |                  |                     |
 * +-----------V------------------V---------------------V----------->  Event Timeline
 *                                ^
 *                                |
 *                                |
 *                                |
 *                                |
 *                                E
 *
 * </pre>
 * Consistency model: Events could be seen as DDLs operations from past either done from
 * this same cluster or some other external system. For example in the events timeline
 * given above, consider a Event E at any given time. The catalog state for the
 * corresponding object of the event could either be stale, exactly-same or at a version
 * which is higher than one provided by event. Catalog state should only be updated when
 * it is stale with respect to the event. In order to determine if the catalog object is
 * stale, we rely on a combination of creationTime and object version. A object in catalog
 * is stale if and only if its creationTime is < creationTime of the object from event OR
 * its version < version from event if createTime matches
 *
 * If the object has the same createTime and version when compared to event or if the
 * createTime > createTime from the event, the event can be safely ignored.
 *
 * Following table shows the actions to be taken when the catalog state is stale.
 *
 * <pre>
 *               +----------------------------------------+
 *               |    Catalog object state                |
 * +----------------------------+------------+------------+
 * | Event type  | Loaded       | Incomplete | Not present|
 * |             |              |            |            |
 * +------------------------------------------------------+
 * |             |              |            |            |
 * | CREATE EVENT| removeAndAdd | Ignore     | Add        |
 * |             |              |            |            |
 * |             |              |            |            |
 * | ALTER EVENT | Invalidate   | Ignore     | Ignore     |
 * |             |              |            |            |
 * |             |              |            |            |
 * | DROP EVENT  | Remove       | Remove     | Ignore     |
 * |             |              |            |            |
 * |             |              |            |            |
 * | INSERT EVENT| Refresh      | Ignore     | Ignore     |
 * |             |              |            |            |
 * +-------------+--------------+------------+------------+
 * </pre>
 *
 * Currently event handlers rely on creation time on Database, Table and Partition to
 * uniquely determine if the object from event is same as object in the catalog. This
 * information is used to make sure that we are deleting the right incarnation of the
 * object when compared to Metastore.
 *
 * Self-events:
 * Events could be generated by this Catalog's operations as well. Invalidating table
 * for such events is unnecessary and inefficient. In order to detect such self-events
 * when catalog executes a DDL operation it appends the current catalog version to the
 * list of version numbers for the in-flight events for the table. Events processor
 * clears this version when the corresponding version number identified by serviceId is
 * received in the event. This is needed since it is possible that a external
 * non-Impala system which generates the event presents the same serviceId and version
 * number later on. The algorithm to detect a self-event is as below.
 *
 * 1. Add the service id and expected catalog version to table/partition parameters
 * when executing the DDL operation. When the HMS operation is successful, add the
 * version number to the list of version for in-flight events at table level.
 * 2. When the event is received, the first time you see the combination of serviceId
 * and version number, event processor clears the version number from table's list and
 * determines the event as self-generated (and hence ignored)
 * 3. If the event data presents a unknown serviceId or if the version number is not
 * present in the list of in-flight versions, event is not a self-event and needs to be
 * processed.
 *
 * In order to limit the total memory footprint, only 10 version numbers are stored at
 * the table. Since the event processor is expected to poll every few seconds this
 * should be a reasonable bound which satisfies most use-cases. Otherwise, event
 * processor may wrongly process a self-event to invalidate the table. In such a case,
 * its a performance penalty not a correctness issue.
 *
 * All the operations which change the state of catalog cache while processing a certain
 * event type must be atomic in nature. We rely on taking a write lock on version object
 * in CatalogServiceCatalog to make sure that readers are blocked while the metadata
 * update operation is being performed. Since the events are generated post-metastore
 * operations, such catalog updates do not need to update the state in Hive Metastore.
 *
 * Error Handling: The event processor could be in ACTIVE, PAUSED, ERROR states. In case
 * of any errors while processing the events the state of event processor changes to ERROR
 * and no subsequent events are polled. In such a case a invalidate metadata command
 * restarts the event polling which updates the lastSyncedEventId to the latest from
 * metastore.
 */
public class MetastoreEventsProcessor implements ExternalEventsProcessor {

  public static final String HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY =
      "hive.metastore.notifications.add.thrift.objects";

  private static final Logger LOG =
      LoggerFactory.getLogger(MetastoreEventsProcessor.class);

  private static final MessageDeserializer MESSAGE_DESERIALIZER =
      MetastoreShim.getMessageDeserializer();

  private static MetastoreEventsProcessor instance;

  // maximum number of events to poll in each RPC
  private static final int EVENTS_BATCH_SIZE_PER_RPC = 1000;

  // maximum time to wait for a clean shutdown of scheduler in seconds
  private static final int SCHEDULER_SHUTDOWN_TIMEOUT = 10;

  // time taken to fetch events during each poll
  public static final String EVENTS_FETCH_DURATION_METRIC = "events-fetch-duration";
  // time taken to apply all the events received duration each poll
  public static final String EVENTS_PROCESS_DURATION_METRIC = "events-apply-duration";
  // rate of events received per unit time
  public static final String EVENTS_RECEIVED_METRIC = "events-received";
  // total number of events which are skipped because of the flag setting
  public static final String EVENTS_SKIPPED_METRIC = "events-skipped";
  // name of the event processor status metric
  public static final String STATUS_METRIC = "status";
  // last synced event id
  public static final String LAST_SYNCED_ID_METRIC = "last-synced-event-id";
  // metric name which counts the number of self-events which are skipped
  public static final String NUMBER_OF_SELF_EVENTS = "self-events-skipped";
  // metric name for number of tables which are refreshed by event processor so far
  public static final String NUMBER_OF_TABLE_REFRESHES = "tables-refreshed";
  // number of times events processor refreshed a partition
  public static final String NUMBER_OF_PARTITION_REFRESHES = "partitions-refreshed";

  // possible status of event processor
  public enum EventProcessorStatus {
    PAUSED, // event processor is paused because catalog is being reset concurrently
    ACTIVE, // event processor is scheduled at a given frequency
    ERROR, // event processor is in error state and event processing has stopped
    NEEDS_INVALIDATE, // event processor could not resolve certain events and needs a
    // manual invalidate command to reset the state (See AlterEvent for a example)
    STOPPED, // event processor is shutdown. No events will be processed
    DISABLED // event processor is not configured to run
  }

  // current status of this event processor
  private EventProcessorStatus eventProcessorStatus_ = EventProcessorStatus.STOPPED;

  // event factory which is used to get or create MetastoreEvents
  private final MetastoreEventFactory metastoreEventFactory_;

  // keeps track of the last event id which we have synced to
  private final AtomicLong lastSyncedEventId_ = new AtomicLong(-1);

  // polling interval in seconds. Note this is a time we wait AFTER each fetch call
  private final long pollingFrequencyInSec_;

  // catalog service instance to be used while processing events
  private final CatalogServiceCatalog catalog_;

  // scheduler daemon thread executor for processing events at given frequency
  private final ScheduledExecutorService scheduler_ = Executors
      .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true)
          .setNameFormat("MetastoreEventsProcessor").build());

  // metrics registry to keep track of metrics related to event processing
  //TODO create a separate singleton class which wraps around this so that we don't
  // have to pass it around as a argument in constructor in MetastoreEvents
  private final Metrics metrics_ = new Metrics();

  @VisibleForTesting
  MetastoreEventsProcessor(CatalogServiceCatalog catalog, long startSyncFromId,
      long pollingFrequencyInSec) throws CatalogException {
    Preconditions.checkState(pollingFrequencyInSec > 0);
    this.catalog_ = Preconditions.checkNotNull(catalog);
    validateConfigs();
    lastSyncedEventId_.set(startSyncFromId);
    metastoreEventFactory_ = new MetastoreEventFactory(catalog_, metrics_);
    pollingFrequencyInSec_ = pollingFrequencyInSec;
    initMetrics();
  }

  /**
   * Fetches the required metastore config values and validates them against the
   * expected values. The configurations to validate are different for HMS-2 v/s HMS-3
   * and hence it uses MetastoreShim to get the configurations which need to be validated.
   * @throws CatalogException if one or more validations fail or if metastore is not
   * accessible
   */
  @VisibleForTesting
  void validateConfigs() throws CatalogException {
    List<ValidationResult> validationErrors = new ArrayList<>();
    for (MetastoreEventProcessorConfig config : getEventProcessorConfigsToValidate()) {
      String configKey = config.getValidator().getConfigKey();
      try {
        String value = getConfigValueFromMetastore(configKey, "");
        ValidationResult result = config.validate(value);
        if (!result.isValid()) validationErrors.add(result);
      } catch (TException e) {
        String msg = String.format("Unable to get configuration %s from metastore. Check "
            + "if metastore is accessible", configKey);
        LOG.error(msg, e);
        throw new CatalogException(msg);
      }
      if (!validationErrors.isEmpty()) {
        LOG.error("Found {} incorrect metastore configuration(s).",
            validationErrors.size());
        for (ValidationResult invalidConfig: validationErrors) {
          LOG.error(invalidConfig.getReason());
        }
        throw new CatalogException(String.format("Found %d incorrect metastore "
            + "configuration(s). Events processor cannot start. See ERROR log for more "
            + "details.", validationErrors.size()));
      }
    }
  }

  /**
   * Returns the list of Metastore configurations to validate depending on the hive
   * version
   */
  public static List<MetastoreEventProcessorConfig> getEventProcessorConfigsToValidate() {
    if (MetastoreShim.getMajorVersion() >= 2) {
      return Arrays.asList(MetastoreEventProcessorConfig.FIRE_EVENTS_FOR_DML);
    }
    return Arrays.asList(MetastoreEventProcessorConfig.values());
  }

  private void initMetrics() {
    metrics_.addTimer(EVENTS_FETCH_DURATION_METRIC);
    metrics_.addTimer(EVENTS_PROCESS_DURATION_METRIC);
    metrics_.addMeter(EVENTS_RECEIVED_METRIC);
    metrics_.addCounter(EVENTS_SKIPPED_METRIC);
    metrics_.addGauge(STATUS_METRIC,
        (Gauge<String>) () -> getStatus().toString());
    metrics_.addGauge(LAST_SYNCED_ID_METRIC,
        (Gauge<Long>) () -> lastSyncedEventId_.get());
    metrics_.addCounter(NUMBER_OF_SELF_EVENTS);
    metrics_.addCounter(NUMBER_OF_TABLE_REFRESHES);
    metrics_.addCounter(NUMBER_OF_PARTITION_REFRESHES);
  }

  /**
   * Schedules the daemon thread at a given frequency. It is important to note that this
   * method schedules with FixedDelay instead of FixedRate. The reason it is scheduled at
   * a fixedDelay is to make sure that we don't pile up the pending tasks in case each
   * polling operation is taking longer than the given frequency. Because of the fixed
   * delay, the new poll operation is scheduled at the time when previousPoll operation
   * completes + givenDelayInSec
   */
  @Override
  public synchronized void start() {
    Preconditions.checkState(eventProcessorStatus_ != EventProcessorStatus.ACTIVE);
    startScheduler();
    updateStatus(EventProcessorStatus.ACTIVE);
    LOG.info(String.format("Successfully started metastore event processing."
        + " Polling interval: %d seconds.", pollingFrequencyInSec_));
  }

  /**
   * Gets the current event processor status
   */
  public EventProcessorStatus getStatus() {
    return eventProcessorStatus_;
  }

  /**
   * Returns the value for a given config key from Hive Metastore.
   * @param config Hive configuration name
   * @param defaultVal Default value to return if config not present in Hive
   */
  @VisibleForTesting
  public String getConfigValueFromMetastore(String config, String defaultVal)
      throws TException {
    try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
      IMetaStoreClient iMetaStoreClient = metaStoreClient.getHiveClient();
      return MetaStoreUtil.getMetastoreConfigValue(iMetaStoreClient, config, defaultVal);
    }
  }
  /**
   * returns the current value of LastSyncedEventId. This method is not thread-safe and
   * only to be used for testing purposes
   */
  @VisibleForTesting
  public long getLastSyncedEventId() {
    return lastSyncedEventId_.get();
  }

  @VisibleForTesting
  void startScheduler() {
    Preconditions.checkState(pollingFrequencyInSec_ > 0);
    LOG.info(String.format("Starting metastore event polling with interval %d seconds.",
        pollingFrequencyInSec_));
    scheduler_.scheduleWithFixedDelay(this::processEvents, pollingFrequencyInSec_,
        pollingFrequencyInSec_, TimeUnit.SECONDS);
  }

  /**
   * Stops the event processing and changes the status of event processor to
   * <code>EventProcessorStatus.PAUSED</code>. No new events will be processed as long
   * the status is stopped. If this event processor is actively processing events when
   * stop is called, this method blocks until the current processing is complete
   */
  @Override
  public synchronized void pause() {
    Preconditions.checkState(eventProcessorStatus_ != EventProcessorStatus.PAUSED);
    updateStatus(EventProcessorStatus.PAUSED);
    LOG.info(String.format("Event processing is paused. Last synced event id is %d",
        lastSyncedEventId_.get()));
  }

  /**
   * Get the current notification event id from metastore
   */
  @Override
  public long getCurrentEventId() throws CatalogException {
    try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
      return metaStoreClient.getHiveClient().getCurrentNotificationEventId().getEventId();
    } catch (TException e) {
      throw new CatalogException("Unable to fetch the current notification event id. "
          + "Check if metastore service is accessible");
    }
  }

  /**
   * Starts the event processor from a given event id
   */
  @Override
  public synchronized void start(long fromEventId) {
    Preconditions.checkArgument(fromEventId >= 0);
    Preconditions.checkState(eventProcessorStatus_ != EventProcessorStatus.ACTIVE,
        "Event processing start called when it is already active");
    long prevLastSyncedEventId = lastSyncedEventId_.get();
    lastSyncedEventId_.set(fromEventId);
    updateStatus(EventProcessorStatus.ACTIVE);
    LOG.info(String.format(
        "Metastore event processing restarted. Last synced event id was updated "
            + "from %d to %d", prevLastSyncedEventId, lastSyncedEventId_.get()));
  }

  /**
   * Stops the event processing and shuts down the scheduler. In case there is a batch of
   * events which is being processed currently, the
   * <code>processEvents</code> method releases lock after every event is processed.
   * Hence, it is possible that at-least 1 event from the batch be
   * processed while shutdown() waits to acquire lock on this object.
   */
  @Override
  public synchronized void shutdown() {
    Preconditions.checkNotNull(scheduler_);
    Preconditions.checkState(eventProcessorStatus_ != EventProcessorStatus.STOPPED,
        "Event processing is already stopped");
    shutdownAndAwaitTermination();
    updateStatus(EventProcessorStatus.STOPPED);
    LOG.info("Metastore event processing stopped.");
  }

  /**
   * Attempts to cleanly shutdown the scheduler pool. If the pool does not shutdown
   * within timeout, does a force shutdown which might interrupt currently running tasks.
   */
  private synchronized void shutdownAndAwaitTermination() {
    scheduler_.shutdown(); // disable new tasks from being submitted
    try {
      // wait for 10 secs for scheduler to complete currently running tasks
      if (!scheduler_.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
        // executor couldn't terminate and timed-out, force the termination
        LOG.info(String.format("Scheduler pool did not terminate within %d seconds. "
            + "Attempting to stop currently running tasks", SCHEDULER_SHUTDOWN_TIMEOUT));
        scheduler_.shutdownNow();
      }
    } catch (InterruptedException e) {
      // current thread interrupted while pool was waiting for termination
      // issue a shutdownNow before returning to cancel currently running tasks
      LOG.info("Received interruptedException. Terminating currently running tasks.", e);
      scheduler_.shutdownNow();
    }
  }

  /**
   * Fetch the next batch of NotificationEvents from metastore. The default batch size if
   * <code>EVENTS_BATCH_SIZE_PER_RPC</code>
   */
  @VisibleForTesting
  protected List<NotificationEvent> getNextMetastoreEvents()
      throws MetastoreNotificationFetchException {
    final Timer.Context context = metrics_.getTimer(EVENTS_FETCH_DURATION_METRIC).time();
    long lastSyncedEventId = lastSyncedEventId_.get();
    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
      // fetch the current notification event id. We assume that the polling interval
      // is small enough that most of these polling operations result in zero new
      // events. In such a case, fetching current notification event id is much faster
      // (and cheaper on HMS side) instead of polling for events directly
      CurrentNotificationEventId currentNotificationEventId =
          msClient.getHiveClient().getCurrentNotificationEventId();
      long currentEventId = currentNotificationEventId.getEventId();

      // no new events since we last polled
      if (currentEventId <= lastSyncedEventId) {
        return Collections.emptyList();
      }

      NotificationEventResponse response = msClient.getHiveClient()
          .getNextNotification(lastSyncedEventId, EVENTS_BATCH_SIZE_PER_RPC, null);
      LOG.info(String.format("Received %d events. Start event id : %d",
          response.getEvents().size(), lastSyncedEventId));
      return response.getEvents();
    } catch (TException e) {
      throw new MetastoreNotificationFetchException(
          "Unable to fetch notifications from metastore. Last synced event id is "
              + lastSyncedEventId, e);
    } finally {
      context.stop();
    }
  }

  /**
   * This method issues a request to Hive Metastore if needed, based on the current event
   * id in metastore and the last synced event_id. Events are fetched in fixed sized
   * batches. Each NotificationEvent received is processed by its corresponding
   * <code>MetastoreEvent</code>
   */
  @Override
  public void processEvents() {
    NotificationEvent lastProcessedEvent = null;
    try {
      EventProcessorStatus currentStatus = eventProcessorStatus_;
      if (currentStatus != EventProcessorStatus.ACTIVE) {
        LOG.warn(String.format(
            "Event processing is skipped since status is %s. Last synced event id is %d",
            currentStatus, lastSyncedEventId_.get()));
        return;
      }

      List<NotificationEvent> events = getNextMetastoreEvents();
      processEvents(events);
    } catch (MetastoreNotificationFetchException ex) {
      // No need to change the EventProcessor state to error since we want the
      // EventProcessor to continue getting new events after HMS is back up.
      LOG.error("Unable to fetch the next batch of metastore events. Hive Metastore " +
        "may be unavailable. Will retry.", ex);
    } catch(MetastoreNotificationNeedsInvalidateException ex) {
      updateStatus(EventProcessorStatus.NEEDS_INVALIDATE);
      LOG.error("Event processing needs a invalidate command to resolve the state", ex);
    } catch (Exception ex) {
      // There are lot of Preconditions which can throw RuntimeExceptions when we
      // process events this catch all exception block is needed so that the scheduler
      // thread does not die silently
      updateStatus(EventProcessorStatus.ERROR);
      LOG.error("Unexpected exception received while processing event", ex);
      dumpEventInfoToLog(lastProcessedEvent);
    }
  }

  /**
   * Gets the current event processor metrics along with its status. If the status is
   * not active the metrics are skipped. Only the status is sent
   */
  @Override
  public TEventProcessorMetrics getEventProcessorMetrics() {
    TEventProcessorMetrics eventProcessorMetrics = new TEventProcessorMetrics();
    EventProcessorStatus currentStatus = getStatus();
    eventProcessorMetrics.setStatus(currentStatus.toString());
    eventProcessorMetrics.setLast_synced_event_id(getLastSyncedEventId());
    if (currentStatus != EventProcessorStatus.ACTIVE) return eventProcessorMetrics;

    long eventsReceived = metrics_.getMeter(EVENTS_RECEIVED_METRIC).getCount();
    long eventsSkipped = metrics_.getCounter(EVENTS_SKIPPED_METRIC).getCount();
    double avgFetchDuration =
        metrics_.getTimer(EVENTS_FETCH_DURATION_METRIC).getMeanRate();
    double avgProcessDuration =
        metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).getMeanRate();
    double avgNumberOfEventsReceived1Min =
        metrics_.getMeter(EVENTS_RECEIVED_METRIC).getOneMinuteRate();
    double avgNumberOfEventsReceived5Min =
        metrics_.getMeter(EVENTS_RECEIVED_METRIC).getFiveMinuteRate();
    double avgNumberOfEventsReceived15Min =
        metrics_.getMeter(EVENTS_RECEIVED_METRIC).getFifteenMinuteRate();


    eventProcessorMetrics.setEvents_received(eventsReceived);
    eventProcessorMetrics.setEvents_skipped(eventsSkipped);
    eventProcessorMetrics.setEvents_fetch_duration_mean(avgFetchDuration);
    eventProcessorMetrics.setEvents_process_duration_mean(avgProcessDuration);
    eventProcessorMetrics.setEvents_received_1min_rate(avgNumberOfEventsReceived1Min);
    eventProcessorMetrics.setEvents_received_5min_rate(avgNumberOfEventsReceived5Min);
    eventProcessorMetrics.setEvents_received_15min_rate(avgNumberOfEventsReceived15Min);

    LOG.trace("Events Received: {} Events skipped: {} Avg fetch duration: {} Avg process "
            + "duration: {} Events received rate (1min) : {}",
        eventsReceived, eventsSkipped, avgFetchDuration, avgProcessDuration,
        avgNumberOfEventsReceived1Min);
    return eventProcessorMetrics;
  }

  @Override
  public TEventProcessorMetricsSummaryResponse getEventProcessorSummary() {
    TEventProcessorMetricsSummaryResponse summaryResponse =
        new TEventProcessorMetricsSummaryResponse();
    summaryResponse.setSummary(metrics_.toString());
    return summaryResponse;
  }

  @VisibleForTesting
  Metrics getMetrics() {
    return metrics_;
  }

  /**
   * Process the given list of notification events. Useful for tests which provide a list
   * of events
   */
  @VisibleForTesting
  protected void processEvents(List<NotificationEvent> events)
      throws MetastoreNotificationException {
    NotificationEvent lastProcessedEvent = null;
    // update the events received metric before returning
    metrics_.getMeter(EVENTS_RECEIVED_METRIC).mark(events.size());
    if (events.isEmpty()) return;
    final Timer.Context context =
        metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).time();
    try {
      List<MetastoreEvent> filteredEvents =
          metastoreEventFactory_.getFilteredEvents(events);
      if (filteredEvents.isEmpty()) {
        lastSyncedEventId_.set(events.get(events.size() - 1).getEventId());
        return;
      }
      for (MetastoreEvent event : filteredEvents) {
        // synchronizing each event processing reduces the scope of the lock so the a
        // potential reset() during event processing is not blocked for longer than
        // necessary
        synchronized (this) {
          if (eventProcessorStatus_ != EventProcessorStatus.ACTIVE) {
            break;
          }
          lastProcessedEvent = event.metastoreNotificationEvent_;
          event.processIfEnabled();
          lastSyncedEventId_.set(event.eventId_);
        }
      }
    } catch (CatalogException e) {
      throw new MetastoreNotificationException(String.format(
          "Unable to process event %d of type %s. Event processing will be stopped.",
          lastProcessedEvent.getEventId(), lastProcessedEvent.getEventType()), e);
    } finally {
      context.stop();
    }
  }

  /**
   * Updates the current states to the given status.
   */
  private synchronized void updateStatus(EventProcessorStatus toStatus) {
    eventProcessorStatus_ = toStatus;
  }

  private void dumpEventInfoToLog(NotificationEvent event) {
    if (event == null) {
      LOG.error("Notification event is null");
      return;
    }
    StringBuilder msg =
        new StringBuilder().append("Event id: ").append(event.getEventId()).append("\n")
            .append("Event Type: ").append(event.getEventType()).append("\n")
            .append("Event time: ").append(event.getEventTime()).append("\n")
            .append("Database name: ").append(event.getDbName()).append("\n");
    if (event.getTableName() != null) {
      msg.append("Table name: ").append(event.getTableName()).append("\n");
    }
    msg.append("Event message: ").append(event.getMessage()).append("\n");
    LOG.error(msg.toString());
  }

  /**
   * Create a instance of this object if it is not initialized. Currently, this object is
   * a singleton and should only be created during catalogD initialization time, so that
   * the start syncId matches with the catalogD startup time.
   *
   * @param catalog the CatalogServiceCatalog instance to which this event processing
   *     belongs
   * @param startSyncFromId Start event id. Events will be polled starting from this
   *     event id
   * @param eventPollingInterval HMS polling interval in seconds
   * @return this object is already created, or create a new one if it is not yet
   *     instantiated
   */
  public static synchronized ExternalEventsProcessor getInstance(
      CatalogServiceCatalog catalog, long startSyncFromId, long eventPollingInterval)
      throws CatalogException {
    if (instance != null) {
      return instance;
    }

    instance =
        new MetastoreEventsProcessor(catalog, startSyncFromId, eventPollingInterval);
    return instance;
  }

  @VisibleForTesting
  public MetastoreEventFactory getMetastoreEventFactory() {
    return metastoreEventFactory_;
  }

  public static MessageDeserializer getMessageDeserializer() {
    return MESSAGE_DESERIALIZER;
  }
}
