/*
  Licensed to the Apache Software Foundation (ASF) under one
  or more contributor license agreements.  See the NOTICE file
  distributed with this work for additional information
  regarding copyright ownership.  The ASF licenses this file
  to you under the Apache License, Version 2.0 (the
  "License"); you may not use this file except in compliance
  with the License.  You may obtain a copy of the License at
  <p>
  http://www.apache.org/licenses/LICENSE-2.0
  <p>
  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
 */

package org.apache.sentry.service.thrift;

import static com.codahale.metrics.MetricRegistry.name;
import static java.util.Collections.emptyMap;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;

import com.google.common.annotations.VisibleForTesting;

import java.io.IOException;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageDeserializer;
import org.apache.sentry.provider.db.service.persistent.PathsImage;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
import org.apache.sentry.provider.db.service.thrift.SentryMetrics;

import org.apache.thrift.TException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Wrapper class for <Code>HiveMetaStoreClient</Code>
 *
 * <p>Abstracts communication with HMS and exposes APi's to connect/disconnect to HMS and to
 * request HMS snapshots and also for new notifications.
 */
class SentryHMSClient implements AutoCloseable {

  private static final Logger LOGGER = LoggerFactory.getLogger(SentryHMSClient.class);
  private static final String NOT_CONNECTED_MSG = "Client is not connected to HMS";

  private final Configuration conf;
  private HiveMetaStoreClient client = null;
  private HiveConnectionFactory hiveConnectionFactory;

  private static final String SNAPSHOT = "snapshot";
  /** Measures time to get full snapshot. */
  private final Timer updateTimer = SentryMetrics.getInstance()
      .getTimer(name(FullUpdateInitializer.class, SNAPSHOT));
  /** Number of times update failed. */
  private final Counter failedSnapshotsCount = SentryMetrics.getInstance()
      .getCounter(name(FullUpdateInitializer.class, "failed"));

  SentryHMSClient(Configuration conf, HiveConnectionFactory hiveConnectionFactory) {
    this.conf = conf;
    this.hiveConnectionFactory = hiveConnectionFactory;
  }

  /**
   * Used only for testing purposes.
   *x
   * @param client HiveMetaStoreClient to be initialized
   */
  @VisibleForTesting
  void setClient(HiveMetaStoreClient client) {
    this.client = client;
  }

  /**
   * Used to know if the client is connected to HMS
   *
   * @return true if the client is connected to HMS false, if client is not connected.
   */
  boolean isConnected() {
    return client != null;
  }

  /**
   * Connects to HMS by creating HiveMetaStoreClient.
   *
   * @throws IOException          if could not establish connection
   * @throws InterruptedException if connection was interrupted
   * @throws MetaException        if other errors happened
   */
  void connect()
      throws IOException, InterruptedException, MetaException {
    if (client != null) {
      return;
    }
    client = hiveConnectionFactory.connect().getClient();
  }

  /**
   * Disconnects the HMS client.
   */
  public void disconnect() throws Exception {
    try {
      if (client != null) {
        LOGGER.info("Closing the HMS client connection");
        client.close();
      }
    } catch (Exception e) {
      LOGGER.error("failed to close Hive Connection Factory", e);
    } finally {
      client = null;
    }
  }

  /**
   * Closes the HMS client.
   *
   * <p>This is similar to disconnect. As this class implements AutoClosable, close should be
   * implemented.
   */
  public void close() throws Exception {
    disconnect();
  }

  /**
   * Creates HMS full snapshot.
   *
   * @return Full path snapshot and the last notification id on success
   */
  PathsImage getFullSnapshot() {
    try {
      if (client == null) {
        LOGGER.error(NOT_CONNECTED_MSG);
        return new PathsImage(Collections.<String, Set<String>>emptyMap(),
            SentryStore.EMPTY_NOTIFICATION_ID, SentryStore.EMPTY_PATHS_SNAPSHOT_ID);
      }

      CurrentNotificationEventId eventIdBefore = client.getCurrentNotificationEventId();
      Map<String, Set<String>> pathsFullSnapshot = fetchFullUpdate();
      if (pathsFullSnapshot.isEmpty()) {
        return new PathsImage(pathsFullSnapshot, SentryStore.EMPTY_NOTIFICATION_ID,
            SentryStore.EMPTY_PATHS_SNAPSHOT_ID);
      }

      CurrentNotificationEventId eventIdAfter = client.getCurrentNotificationEventId();
      LOGGER.info("NotificationID, Before Snapshot: {}, After Snapshot {}",
          eventIdBefore.getEventId(), eventIdAfter.getEventId());

      if (eventIdAfter.equals(eventIdBefore)) {
        LOGGER.info("Successfully fetched hive full snapshot, Current NotificationID: {}.",
                eventIdAfter);
        // As eventIDAfter is the last event that was processed, eventIDAfter is used to update
        // lastProcessedNotificationID instead of getting it from persistent store.
        return new PathsImage(pathsFullSnapshot, eventIdAfter.getEventId(),
                SentryStore.EMPTY_PATHS_SNAPSHOT_ID);
      }

      LOGGER.info("Reconciling full snapshot - applying {} changes",
              eventIdAfter.getEventId() - eventIdBefore.getEventId());

      // While we were taking snapshot, HMS made some changes, so now we need to apply all
      // extra events to the snapshot
      long currentEventId = eventIdBefore.getEventId();
      SentryJSONMessageDeserializer deserializer = new SentryJSONMessageDeserializer();

      while (currentEventId < eventIdAfter.getEventId()) {
        NotificationEventResponse response =
                client.getNextNotification(currentEventId, Integer.MAX_VALUE, null);
        if (response == null || !response.isSetEvents() || response.getEvents().isEmpty()) {
          LOGGER.error("Snapshot discarded, updates to HMS data while shapshot is being taken."
                  + "ID Before: {}. ID After: {}", eventIdBefore.getEventId(), eventIdAfter.getEventId());
          return new PathsImage(Collections.<String, Set<String>>emptyMap(),
                  SentryStore.EMPTY_NOTIFICATION_ID, SentryStore.EMPTY_PATHS_SNAPSHOT_ID);
        }

        for (NotificationEvent event : response.getEvents()) {
          if (event.getEventId() <= eventIdBefore.getEventId()) {
            LOGGER.error("Received stray event with eventId {} which is less then {}",
                    event.getEventId(), eventIdBefore);
            continue;
          }
          if (event.getEventId() > eventIdAfter.getEventId()) {
            // Enough events processed
            break;
          }
          try {
            FullUpdateModifier.applyEvent(pathsFullSnapshot, event, deserializer);
          } catch (Exception e) {
            LOGGER.warn("Failed to apply operation", e);
          }
          currentEventId = event.getEventId();
        }
      }

      LOGGER.info("Successfully fetched hive full snapshot, Current NotificationID: {}.",
          currentEventId);
      // As eventIDAfter is the last event that was processed, eventIDAfter is used to update
      // lastProcessedNotificationID instead of getting it from persistent store.
      return new PathsImage(pathsFullSnapshot, currentEventId,
          SentryStore.EMPTY_PATHS_SNAPSHOT_ID);
    } catch (TException failure) {
      LOGGER.error("Failed to communicate to HMS");
      return new PathsImage(Collections.<String, Set<String>>emptyMap(),
          SentryStore.EMPTY_NOTIFICATION_ID, SentryStore.EMPTY_PATHS_SNAPSHOT_ID);
    }
  }

  /**
   * Retrieve a Hive full snapshot from HMS.
   *
   * @return HMS snapshot. Snapshot consists of a mapping from auth object name to the set of paths
   *     corresponding to that name.
   */
  private Map<String, Set<String>> fetchFullUpdate() {
    LOGGER.info("Request full HMS snapshot");
    try (FullUpdateInitializer updateInitializer =
             new FullUpdateInitializer(hiveConnectionFactory, conf);
         Context context = updateTimer.time()) {
      Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot();
      LOGGER.info("Obtained full HMS snapshot");
      return pathsUpdate;
    } catch (Exception ignored) {
      failedSnapshotsCount.inc();
      LOGGER.error("Snapshot created failed ", ignored);
      return emptyMap();
    }
  }

  /**
   * Returns all HMS notifications with ID greater than the specified one
   *
   * @param notificationId ID of the last notification that was processed.
   * @return Collection of new events to be synced
   */
  Collection<NotificationEvent> getNotifications(long notificationId) throws Exception {
    if (client == null) {
      LOGGER.error(NOT_CONNECTED_MSG);
      return Collections.emptyList();
    }

    LOGGER.debug("Checking for notifications beyond {}", notificationId);

    // A bug HIVE-15761 (fixed on Hive 2.4.0) should allow requesting notifications using
    // an unprocessed notification ID without causing an exception. For now, we just
    // leave this workaround and log debug messages.
    CurrentNotificationEventId eventId = client.getCurrentNotificationEventId();
    LOGGER.debug("ID of Last HMS notifications is: {}", eventId.getEventId());
    if (eventId != null && eventId.getEventId() < notificationId) {
      LOGGER.debug("Last notification of HMS is smaller than what sentry processed, Something is"
          + "wrong. Sentry will request a full Snapshot");
      return Collections.emptyList();
    }

    if (eventId != null && eventId.getEventId() == notificationId) {
      return Collections.emptyList();
    }

    NotificationEventResponse response =
        client.getNextNotification(notificationId, Integer.MAX_VALUE, null);
    if (response != null && response.isSetEvents()) {
      LOGGER.debug("Last Id processed:{}. Received collection of notifications, Size:{}",
          notificationId, response.getEvents().size());
      return response.getEvents();
    }

    return Collections.emptyList();
  }

  /**
   * @return the latest notification Id logged by the HMS
   * @throws Exception when an error occurs when talking to the HMS client
   */
  long getCurrentNotificationId() throws Exception {
    if (client == null) {
      LOGGER.error(NOT_CONNECTED_MSG);
      return SentryStore.EMPTY_NOTIFICATION_ID;
    }

    CurrentNotificationEventId eventId = client.getCurrentNotificationEventId();
    if (eventId.isSetEventId()) {
      return eventId.getEventId();
    }

    return SentryStore.EMPTY_NOTIFICATION_ID;
  }
}
