// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License

package org.apache.impala.catalog;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import com.google.common.util.concurrent.Uninterruptibles;
import com.sun.management.GarbageCollectorMXBean;
import com.sun.management.GcInfo;
import org.apache.impala.common.Reference;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TTableName;
import org.apache.log4j.Logger;

import javax.management.Notification;
import javax.management.NotificationEmitter;
import javax.management.NotificationListener;
import java.lang.management.MemoryUsage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Automatically invalidates recently unused tables. There are currently 2 rules
 * implemented:
 * 1. Invalidate a certain percentage of the least recently used tables after a GC with an
 * almost full old generation. The fullness of the GC generation depends on the maximum
 * heap size.
 * 2. If invalidate_tables_timeout_s is set in the backend, unused tables older than the
 * threshold are invalidated periodically.
 */
public class CatalogdTableInvalidator {
  public static final Logger LOG = Logger.getLogger(CatalogdTableInvalidator.class);
  /**
   * Plugable time source for tests. Defined as static to avoid passing
   * CatalogdTableInvalidator everywhere the clock is used.
   */
  @VisibleForTesting
  static Ticker TIME_SOURCE = Ticker.systemTicker();
  private static long DAEMON_MAXIMUM_SLEEP_NANO = TimeUnit.MINUTES.toNanos(5);
  final private long unusedTableTtlNano_;
  final private boolean invalidateTableOnMemoryPressure_;
  final private CatalogServiceCatalog catalog_;
  /**
   * A thread waking up periodically to check if eviction is needed.
   */
  final private Thread daemonThread_;
  /**
   * The threshold above which the old gen is considered almost full.
   */
  final private double oldGenFullThreshold_;
  /**
   * The ratio of tables to invalidate when the old gen is almost full.
   */
  final private double gcInvalidationFraction_;
  /**
   * The number of times the daemon thread wakes up and scans the tables for invalidation.
   * It's useful for tests to ensure that a scan happened.
   */
  @VisibleForTesting
  AtomicLong scanCount_ = new AtomicLong();
  private GarbageCollectorMXBean oldGenGcBean_;
  /**
   * The name of the old gen memory pool.
   */
  private String oldGcGenName_;
  /**
   * The value of oldGenGcBean_.getCollectionCount() when the last memory-based
   * invalidation was executed.
   */
  private long lastObservedGcCount_;
  private boolean stopped_ = false;
  /**
   * Last time an time-based invalidation is executed in nanoseconds.
   */
  private long lastInvalidationTime_;

  CatalogdTableInvalidator(CatalogServiceCatalog catalog, final long unusedTableTtlSec,
      boolean invalidateTableOnMemoryPressure, double oldGenFullThreshold,
      double gcInvalidationFraction) {
    catalog_ = catalog;
    unusedTableTtlNano_ = TimeUnit.SECONDS.toNanos(unusedTableTtlSec);
    oldGenFullThreshold_ = oldGenFullThreshold;
    gcInvalidationFraction_ = gcInvalidationFraction;
    lastInvalidationTime_ = TIME_SOURCE.read();
    invalidateTableOnMemoryPressure_ =
        invalidateTableOnMemoryPressure && tryInstallGcListener();
    daemonThread_ = new Thread(new DaemonThread());
    daemonThread_.setDaemon(true);
    daemonThread_.setName("CatalogTableInvalidator timer");
    daemonThread_.start();
  }

  public static CatalogdTableInvalidator create(CatalogServiceCatalog catalog,
      BackendConfig config) {
    final boolean invalidateTableOnMemoryPressure =
        config.invalidateTablesOnMemoryPressure();
    final int timeoutSec = config.getInvalidateTablesTimeoutS();
    final double gcOldGenFullThreshold =
        config.getInvalidateTablesGcOldGenFullThreshold();
    final double fractionOnMemoryPressure =
        config.getInvalidateTablesFractionOnMemoryPressure();
    Preconditions.checkArgument(timeoutSec >= 0,
        "invalidate_tables_timeout_s must be a non-negative integer.");
    Preconditions.checkArgument(gcOldGenFullThreshold >= 0 && gcOldGenFullThreshold <= 1,
        "invalidate_tables_gc_old_gen_full_threshold must be in [0, 1].");
    Preconditions
        .checkArgument(fractionOnMemoryPressure >= 0 && fractionOnMemoryPressure <= 1,
            "invalidate_tables_fraction_on_memory_pressure must be in [0, 1].");
    if (timeoutSec > 0 || invalidateTableOnMemoryPressure) {
      return new CatalogdTableInvalidator(catalog, timeoutSec,
          invalidateTableOnMemoryPressure, gcOldGenFullThreshold,
          fractionOnMemoryPressure);
    } else {
      return null;
    }
  }

  static long nanoTime() {
    return TIME_SOURCE.read();
  }

  /**
   * Try to find the old generation memory pool in the GC beans and listen to the GC bean
   * which the old gen memory pool belongs to. Return whether the GC beans are supported.
   * If the return value is false, the listener is not installed.
   */
  private boolean tryInstallGcListener() {
    String commonErrMsg = "Continuing without GC-triggered invalidation of tables.";
    List<GarbageCollectorMXBean> gcbeans = java.lang.management.ManagementFactory
        .getPlatformMXBeans(GarbageCollectorMXBean.class);
    GcNotificationListener gcNotificationListener = new GcNotificationListener();

    boolean foundOldPool = false;
    for (GarbageCollectorMXBean gcbean : gcbeans) {
      for (String poolName : gcbean.getMemoryPoolNames()) {
        if (!poolName.contains("Old")) continue;
        if (!(gcbean instanceof NotificationEmitter)) {
          LOG.warn("GCBean " + gcbean.getClass().getName() + " is not supported " +
              "because it does not implement NotificationEmitter. " + commonErrMsg);
          return false;
        }
        oldGenGcBean_ = gcbean;
        oldGcGenName_ = poolName;
        lastObservedGcCount_ = gcbean.getCollectionCount();
        foundOldPool = true;
        ((NotificationEmitter) gcbean)
            .addNotificationListener(gcNotificationListener, null, null);
        break;
      }
    }
    if (!foundOldPool) {
      LOG.warn("Cannot find old generation memory pool in the GC beans. " + commonErrMsg);
    }
    return foundOldPool;
  }

  /**
   * Detect whether a GC happened since the last observation and the old generation is
   * loaded more than the configured threshold. If so it returns true indicating that
   * tables should be evicted because of memory pressure.
   */
  private boolean shouldEvictFromFullHeapAfterGc() {
    if (!invalidateTableOnMemoryPressure_) return false;
    long gcCount = oldGenGcBean_.getCollectionCount();
    if (gcCount > lastObservedGcCount_) {
      lastObservedGcCount_ = gcCount;
      GcInfo lastGcInfo = oldGenGcBean_.getLastGcInfo();
      if (lastGcInfo == null) {
        LOG.warn("gcBean.getLastGcInfo() returned null. Table invalidation based on " +
            "memory pressure was skipped.");
        return false;
      }
      MemoryUsage tenuredGenUsage = lastGcInfo.getMemoryUsageAfterGc().get(oldGcGenName_);
      Preconditions.checkState(tenuredGenUsage != null);
      return tenuredGenUsage.getMax() * oldGenFullThreshold_ < tenuredGenUsage.getUsed();
    }
    return false;
  }

  private void invalidateSome(double invalidationFraction) {
    List<Table> tables = new ArrayList<>();
    for (Db db : catalog_.getAllDbs()) {
      for (Table table : db.getTables()) {
        if (!(table instanceof IncompleteTable)) tables.add(table);
      }
    }
    // TODO: use quick select
    Collections.sort(tables, new Comparator<Table>() {
      @Override
      public int compare(Table o1, Table o2) {
        return Long.compare(o1.getLastUsedTime(), o2.getLastUsedTime());
      }
    });
    for (int i = 0; i < tables.size() * invalidationFraction; ++i) {
      TTableName tTableName = tables.get(i).getTableName().toThrift();
      Reference<Boolean> tblWasRemoved = new Reference<>();
      Reference<Boolean> dbWasAdded = new Reference<>();
      catalog_.invalidateTable(tTableName, tblWasRemoved, dbWasAdded);
      LOG.info("Table " + tables.get(i).getFullName() + " invalidated due to memory " +
          "pressure.");
    }
  }

  private void invalidateOlderThan(long retireAgeNano) {
    long now = TIME_SOURCE.read();
    for (Db db : catalog_.getAllDbs()) {
      for (Table table : catalog_.getAllTables(db)) {
        if (table instanceof IncompleteTable) continue;
        long inactivityTime = now - table.getLastUsedTime();
        if (inactivityTime <= retireAgeNano) continue;
        Reference<Boolean> tblWasRemoved = new Reference<>();
        Reference<Boolean> dbWasAdded = new Reference<>();
        TTableName tTableName = table.getTableName().toThrift();
        catalog_.invalidateTable(tTableName, tblWasRemoved, dbWasAdded);
        LOG.info(
            "Invalidated " + table.getFullName() + " due to inactivity for " +
                TimeUnit.NANOSECONDS.toSeconds(inactivityTime) + " seconds.");
      }
    }
  }

  void stop() {
    synchronized (this) {
      stopped_ = true;
      notify();
    }
    try {
      daemonThread_.join();
    } catch (InterruptedException e) {
      LOG.warn("stop() is interrupted", e);
    }
  }

  @VisibleForTesting
  synchronized void wakeUpForTests() {
    notify();
  }

  private class DaemonThread implements Runnable {
    @Override
    public void run() {
      long sleepTimeNano = Math.min(unusedTableTtlNano_ / 10, DAEMON_MAXIMUM_SLEEP_NANO);
      while (true) {
        try {
          synchronized (CatalogdTableInvalidator.this) {
            if (stopped_) return;
            if (shouldEvictFromFullHeapAfterGc()) {
              invalidateSome(gcInvalidationFraction_);
              scanCount_.incrementAndGet();
            }
            long now = nanoTime();
            // Wait for a fraction of unusedTableTtlNano_ if time-based invalidation is
            // enabled
            if (unusedTableTtlNano_ > 0 && now >= lastInvalidationTime_ + sleepTimeNano) {
              invalidateOlderThan(unusedTableTtlNano_);
              lastInvalidationTime_ = now;
              scanCount_.incrementAndGet();
            }
            // Wait unusedTableTtlSec if it is configured. Otherwise wait
            // indefinitely.
            TimeUnit.NANOSECONDS.timedWait(CatalogdTableInvalidator.this, sleepTimeNano);
          }
        } catch (Exception e) {
          LOG.warn("Unexpected exception thrown while attempting to automatically " +
              "invalidate tables. Will retry in 5 seconds.", e);
          Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
        }
      }
    }
  }

  private class GcNotificationListener implements NotificationListener {
    @Override
    public void handleNotification(Notification notification, Object handback) {
      synchronized (CatalogdTableInvalidator.this) {
        CatalogdTableInvalidator.this.notify();
      }
    }
  }
}
