// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.impala.analysis;

import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.impala.catalog.FeCatalog;
import org.apache.impala.catalog.FeDb;
import org.apache.impala.catalog.FeIncompleteTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.FeView;
import org.apache.impala.catalog.Table;
import org.apache.impala.common.InternalException;
import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.service.Frontend;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.EventSequence;
import org.apache.impala.util.TUniqueIdUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

/**
 * Loads all table and view metadata relevant for a single SQL statement and returns the
 * loaded tables in a StmtTableCache. Optionally marks important loading events in an
 * EventSequence.
 */
public class StmtMetadataLoader {
  private final static Logger LOG = LoggerFactory.getLogger(StmtMetadataLoader.class);

  // Events are triggered when at least the set number of catalog updates have passed.
  private final long DEBUG_LOGGING_NUM_CATALOG_UPDATES = 10;
  private final long RETRY_LOAD_NUM_CATALOG_UPDATES = 20;

  private final Frontend fe_;
  private final String sessionDb_;
  private final EventSequence timeline_;

  // Results of the loading process. See StmtTableCache.
  private final Set<String> dbs_ = new HashSet<>();
  private final Map<TableName, FeTable> loadedOrFailedTbls_ = new HashMap<>();

  // Metrics for the metadata load.
  // Number of prioritizedLoad() RPCs issued to the catalogd.
  private int numLoadRequestsSent_ = 0;
  // Number of catalog topic updates received from the statestore.
  private int numCatalogUpdatesReceived_ = 0;

  /**
   * Contains all statement-relevant tables and database names as well as the latest
   * ImpaladCatalog. An entry in the tables map is guaranteed to point to a loaded
   * table. This could mean the table was loaded successfully or a load was attempted
   * but failed. The absence of a table or database name indicates that object was not
   * in the Catalog at the time this StmtTableCache was generated.
   */
  public static final class StmtTableCache {
    public final FeCatalog catalog;
    public final Set<String> dbs;
    public final Map<TableName, FeTable> tables;

    public StmtTableCache(FeCatalog catalog, Set<String> dbs,
        Map<TableName, FeTable> tables) {
      this.catalog = Preconditions.checkNotNull(catalog);
      this.dbs = Preconditions.checkNotNull(dbs);
      this.tables = Preconditions.checkNotNull(tables);
      validate();
    }

    private void validate() {
      // Checks that all entries in 'tables' have a matching entry in 'dbs'.
      for (TableName tbl: tables.keySet()) {
        Preconditions.checkState(dbs.contains(tbl.getDb()));
      }
    }
  }

  /**
   * The 'fe' and 'sessionDb' arguments must be non-null. A null 'timeline' may be passed
   * if no events should be marked.
   */
  public StmtMetadataLoader(Frontend fe, String sessionDb, EventSequence timeline) {
    fe_ = Preconditions.checkNotNull(fe);
    sessionDb_ = Preconditions.checkNotNull(sessionDb);
    timeline_ = timeline;
  }

  // Getters for testing
  public EventSequence getTimeline() { return timeline_; }
  public int getNumLoadRequestsSent() { return numLoadRequestsSent_; }
  public int getNumCatalogUpdatesReceived() { return numCatalogUpdatesReceived_; }

  /**
   * Collects and loads all tables and views required to analyze the given statement.
   * Marks the start and end of metadata loading in 'timeline_' if it is non-NULL.
   * Must only be called once for a single statement.
   */
  public StmtTableCache loadTables(StatementBase stmt) throws InternalException {
    Set<TableName> requiredTables = collectTableCandidates(stmt);
    return loadTables(requiredTables);
  }

  /**
   * Loads the tables/views with the given names and returns them. As views become
   * loaded, the set of table/views still to be loaded is expanded based on the view
   * definitions. For tables/views missing metadata this function issues a loading
   * request to the catalog server and then waits for the metadata to arrive through
   * a statestore topic update.
   * This function succeeds even across catalog restarts for the following reasons:
   * - The loading process is strictly additive, i.e., a new loaded table may be added
   *   to the 'loadedOrFailedTbls_' map, but an existing entry is never removed, even if
   *   the equivalent table in the impalad catalog is different.
   * - Tables on the impalad side are not modified in place. This means that an entry in
   *   the 'loadedOrFailedTbls_' will always remain in the loaded state.
   * Tables/views that are already loaded are simply included in the result.
   * Marks the start and end of metadata loading in 'timeline_' if it is non-NULL.
   * Must only be called once for a single statement.
   */
  public StmtTableCache loadTables(Set<TableName> tbls) throws InternalException {
    Preconditions.checkState(dbs_.isEmpty() && loadedOrFailedTbls_.isEmpty());
    Preconditions.checkState(numLoadRequestsSent_ == 0);
    Preconditions.checkState(numCatalogUpdatesReceived_ == 0);
    FeCatalog catalog = fe_.getCatalog();
    Set<TableName> missingTbls = getMissingTables(catalog, tbls);
    // There are no missing tables. Return to avoid making an RPC to the CatalogServer
    // and adding events to the timeline.
    if (missingTbls.isEmpty()) {
      if (timeline_ != null) {
        timeline_.markEvent(String.format("Metadata of all %d tables cached",
            loadedOrFailedTbls_.size()));
      }
      fe_.getImpaladTableUsageTracker().recordTableUsage(loadedOrFailedTbls_.keySet());
      return new StmtTableCache(catalog, dbs_, loadedOrFailedTbls_);
    }

    if (timeline_ != null) timeline_.markEvent("Metadata load started");
    long startTimeMs = System.currentTimeMillis();

    // All tables for which we have requested a prioritized load.
    Set<TableName> requestedTbls = new HashSet<>();

    // Loading a fixed set of tables happens in two steps:
    // 1) Issue a loading request RPC to the catalogd.
    // 2) Wait for the loaded tables to arrive via the statestore.
    // The second step could take a while and we should avoid repeatedly issuing
    // redundant RPCs to the catalogd. This flag indicates whether a loading RPC
    // should be issued. See below for more details in which circumstances this
    // flag is set to true.
    boolean issueLoadRequest = true;
    // Loop until all the missing tables are loaded in the Impalad's catalog cache.
    // In every iteration of this loop we wait for one catalog update to arrive.
    //
    // This isn't relevant for LocalCatalog, since we loaded all of the table references
    // on-demand in the first recursive call to 'getMissingTables' above.
    while (!missingTbls.isEmpty()) {
      if (issueLoadRequest) {
        catalog.prioritizeLoad(missingTbls);
        ++numLoadRequestsSent_;
        requestedTbls.addAll(missingTbls);
      }

      // Catalog may have been restarted, always use the latest reference.
      FeCatalog currCatalog = fe_.getCatalog();
      boolean hasCatalogRestarted = currCatalog != catalog;
      if (hasCatalogRestarted && LOG.isWarnEnabled()) {
        LOG.warn(String.format(
            "Catalog restart detected while waiting for table metadata. " +
            "Current catalog service id: %s. Previous catalog service id: %s",
            TUniqueIdUtil.PrintId(currCatalog.getCatalogServiceId()),
            TUniqueIdUtil.PrintId(catalog.getCatalogServiceId())));

      }
      catalog = currCatalog;

      // Log progress and wait time for debugging.
      if (hasCatalogRestarted
          || (numCatalogUpdatesReceived_ > 0
              && numCatalogUpdatesReceived_ % DEBUG_LOGGING_NUM_CATALOG_UPDATES == 0)) {
        if (LOG.isInfoEnabled()) {
          long endTimeMs = System.currentTimeMillis();
          LOG.info(String.format("Waiting for table metadata. " +
              "Waited for %d catalog updates and %dms. Tables remaining: %s",
              numCatalogUpdatesReceived_, endTimeMs - startTimeMs, missingTbls));
        }
      }

      // Wait for the next catalog update and then revise the loaded/missing tables.
      catalog.waitForCatalogUpdate(Frontend.MAX_CATALOG_UPDATE_WAIT_TIME_MS);
      Set<TableName> newMissingTbls = getMissingTables(catalog, missingTbls);
      // Issue a load request for the new missing tables in these cases:
      // 1) Catalog has restarted so all in-flight loads have been lost
      // 2) There are new missing tables due to view expansion
      issueLoadRequest = hasCatalogRestarted || !missingTbls.containsAll(newMissingTbls);
      // 3) Periodically retry to avoid a hang due to anomalies/bugs, e.g.,
      //    a previous load request was somehow lost on the catalog side, or the table
      //    was invalidated after being loaded but before being sent to this impalad
      if (!issueLoadRequest && numCatalogUpdatesReceived_ > 0
          && numCatalogUpdatesReceived_ % RETRY_LOAD_NUM_CATALOG_UPDATES == 0) {
        issueLoadRequest = true;
        if (LOG.isInfoEnabled()) {
          long endTimeMs = System.currentTimeMillis();
          LOG.info(String.format("Re-sending prioritized load request. " +
              "Waited for %d catalog updates and %dms.",
              numCatalogUpdatesReceived_, endTimeMs - startTimeMs));
        }
      }
      missingTbls = newMissingTbls;
      ++numCatalogUpdatesReceived_;
    }

    if (timeline_ != null) {
      long storageLoadTimeNano = 0;
      // Calculate the total storage loading time for this query (not including
      // the tables already loaded before the query was called).
      storageLoadTimeNano =
          loadedOrFailedTbls_.values()
              .stream()
              .filter(Table.class::isInstance)
              .map(Table.class::cast)
              .filter(loadedTbl -> requestedTbls.contains(loadedTbl.getTableName()))
              .mapToLong(Table::getStorageLoadTime)
              .sum();
      timeline_.markEvent(String.format("Metadata load finished. "
              + "loaded-tables=%d/%d load-requests=%d catalog-updates=%d "
              + "storage-load-time=%dms",
          requestedTbls.size(), loadedOrFailedTbls_.size(), numLoadRequestsSent_,
          numCatalogUpdatesReceived_,
          TimeUnit.MILLISECONDS.convert(storageLoadTimeNano, TimeUnit.NANOSECONDS)));

      if (MetastoreShim.getMajorVersion() > 2) {
        StringBuilder validIdsBuf = new StringBuilder("Loaded ValidWriteIdLists");
        validIdsBuf.append(" for transactional tables: ");
        boolean hasAcidTbls = false;
        for (FeTable iTbl : loadedOrFailedTbls_.values()) {
          if (iTbl instanceof FeIncompleteTable) continue;
          if (AcidUtils.isTransactionalTable(iTbl.getMetaStoreTable().getParameters())) {
            validIdsBuf.append("\n");
            validIdsBuf.append("           ");
            validIdsBuf.append(iTbl.getValidWriteIds());
            hasAcidTbls = true;
          }
        }
        validIdsBuf.append("\n");
        validIdsBuf.append("             ");
        if (hasAcidTbls) timeline_.markEvent(validIdsBuf.toString());
      }
    }
    fe_.getImpaladTableUsageTracker().recordTableUsage(loadedOrFailedTbls_.keySet());
    return new StmtTableCache(catalog, dbs_, loadedOrFailedTbls_);
  }

  /**
   * Determines whether the 'tbls' are loaded in the given catalog or not. Adds the names
   * of referenced databases that exist to 'dbs_', and loaded tables to
   * 'loadedOrFailedTbls_'.
   * Returns the set of tables that are not loaded. Recursively collects loaded/missing
   * tables from views. Uses 'sessionDb_' to construct table candidates from views with
   * Path.getCandidateTables(). Non-existent tables are ignored and not returned or
   * added to 'loadedOrFailedTbls_'.
   */
  private Set<TableName> getMissingTables(FeCatalog catalog, Set<TableName> tbls) {
    Set<TableName> missingTbls = new HashSet<>();
    Set<TableName> viewTbls = new HashSet<>();
    for (TableName tblName: tbls) {
      if (loadedOrFailedTbls_.containsKey(tblName)) continue;
      FeDb db = catalog.getDb(tblName.getDb());
      if (db == null) continue;
      dbs_.add(tblName.getDb());
      FeTable tbl = db.getTable(tblName.getTbl());
      if (tbl == null) continue;
      if (!tbl.isLoaded()) {
        missingTbls.add(tblName);
        continue;
      }
      loadedOrFailedTbls_.put(tblName, tbl);
      if (tbl instanceof FeView) {
        viewTbls.addAll(collectTableCandidates(((FeView) tbl).getQueryStmt()));
      }
    }
    // Recursively collect loaded/missing tables from loaded views.
    if (!viewTbls.isEmpty()) missingTbls.addAll(getMissingTables(catalog, viewTbls));
    return missingTbls;
  }

  /**
   * Returns the set of tables whose metadata needs to be loaded for the analysis of the
   * given 'stmt' to succeed. This is done by collecting all table references from 'stmt'
   * and generating all possible table-path resolutions considered during analysis.
   * Uses 'sessionDb_' to construct the candidate tables with Path.getCandidateTables().
   */
  private Set<TableName> collectTableCandidates(StatementBase stmt) {
    Preconditions.checkNotNull(stmt);
    List<TableRef> tblRefs = new ArrayList<>();
    stmt.collectTableRefs(tblRefs);
    Set<TableName> tableNames = new HashSet<>();
    for (TableRef ref: tblRefs) {
      tableNames.addAll(Path.getCandidateTables(ref.getPath(), sessionDb_));
    }
    return tableNames;
  }
}
