/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.store;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.collections.ImmutableEntry;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.planner.logical.StoragePlugins;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.PluginHandle.PluginType;
import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader;
import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
import org.apache.drill.shaded.guava.com.google.common.cache.RemovalListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Plugin registry. Caches plugin instances which correspond to configurations
 * stored in persistent storage. Synchronizes the instances and storage.
 * <p>
 * Allows multiple "locators" to provide plugin classes such as the "classic"
 * version for classes in the same class loader, the "system" version for
 * system-defined plugins.
 * <p>
 * provides multiple layers of abstraction:
 * <ul>
 * <li>A plugin config/implementation pair (called a "connector" here)
 * is located by</li>
 * <li>A connector locator, which also provides bootstrap plugins and can
 * create a plugin instance from a configuration, which are cached in</li>
 * <li>The plugin cache, which holds stored, system and ad-hoc plugins. The
 * stored plugins are backed by</li>
 * <li>A persistent store: the file system for tests and embedded, ZK for
 * a distibuted server, or</li>
 * <li>An ephemeral cache for unnamed configs, such as those created by
 * a table function.</li>
 * </ul>
 * <p>
 * The idea is to push most functionality into the above abstractions,
 * leaving overall coordination here.
 * <p>
 * Plugins themselves have multiple levels of definitions:
 * <ul>
 * <li>The config and plugin classes, provided by the locator.</li>
 * <li>The {@link ConnectorHandle} which defines the config class and
 * the locator which can create instances of that class.</li>
 * <li>A config instance which is typically deserialized from JSON
 * independent of the implementation class.</li>
 * <li>A {@link PluginHandle} which pairs the config with a name as
 * the unit that the user thinks of as a "plugin." The plugin entry
 * links to the {@code ConnectorEntry} to create the instance lazily
 * when first requested.</li>
 * <li>The plugin class instance, which provides long-term state and
 * which provides the logic for the plugin.</li>
 * </ul>
 *
 * <h4>Concurrency</h4>
 *
 * Drill is a concurrent system; multiple users can attempt to add, remove
 * and update plugin configurations at the same time. The only good
 * solution would be to version the plugin configs. Instead, we rely on
 * the fact that configs change infrequently.
 * <p>
 * The code syncs the in-memory cache with the persistent store on each
 * access (which is actually inefficient and should be reviewed.)
 * <p>
 * During refresh, it could be that another thread is doing exactly
 * the same thing, or even fighting us by changing the config. It is
 * impossible to ensure a totally consistent answer. The goal is to
 * make sure that the cache ends up agreeing with the persistent store
 * as it was at some point in time.
 * <p>
 * The {@link PluginsMap} class provides in-memory synchronization of the
 * name and config maps. Careful coding is needed when handling refresh
 * since another thread could make the same changes.
 * <p>
 * Once the planner obtains a plugin, another user could come along and
 * change the config for that plugin. Drill treats that change as another
 * plugin: the original one continues to be used by the planner (but see
 * below), while new queries use the new version.
 * <p>
 * Since the config on remote servers may have changed relative to the one
 * this Foreman used for planning, the plan includes the plugin config
 * itself (not just a reference to the config.) This works because the
 * config is usually small.
 *
 * <h4>Ephemeral Plugins</h4>
 *
 * An ephemeral plugin handles table functions which create a temporary,
 * unnamed configuration that is needed only for the execution of a
 * single query, but which may be used across many threads. If the same
 * table function is used multiple times, then the same ephemeral plugin
 * will be used across queries. Ephemeral plugins are are based on the
 * same connectors as stored plugins, but are not visible to the planner.
 * They will expire after some time or number.
 * <p>
 * The ephemeral store also acts as a graveyard for deleted or changed
 * plugins. When removing a plugin, the old plugin is moved to ephemeral
 * storage to allow running queries to locate it. Similarly, when a
 * new configuration is stored, the corresponding plugin is retrieved
 * from ephemeral storage, if it exists. This avoids odd cases where
 * the same plugin exists in both normal and ephemeral storage.
 *
 * <h4>Caveats</h4>
 *
 * The main problem with synchronization at present is that plugins
 * provide a {@link close()} method that, if used, could render the
 * plugin unusable. Suppose a Cassandra plugin, say, maintains a connection
 * to a server used across multiple queries and threads. Any change to
 * the config immediately calls {@code close()} on the plugin, even though
 * it may be in use in planning a query on another thread. Random failures
 * will result.
 * <p>
 * The same issue can affect ephemeral plugins: if the number in the cache
 * reaches the limit, the registry will start closing old ones, without
 * knowning if that plugin is actually in use.
 * <p>
 * The workaround is to not actually honor the {@code close()} call. Longer
 * term, a reference count is needed.
 *
 * <h4>Error Handling</h4>
 *
 * Error handling needs review. Those problems that result from user actions
 * should be raised as a {@code UserException}. Those that violate invariants
 * as other forms of exception.
 */
public class StoragePluginRegistryImpl implements StoragePluginRegistry {
  private static final Logger logger = LoggerFactory.getLogger(StoragePluginRegistryImpl.class);

  private final PluginRegistryContext context;

  /**
   * Cache of enabled, stored plugins, as well as system and ad-hoc
   * plugins. Plugins live in the cache until Drillbit exit, or
   * (except for system plugins) explicitly removed.
   */
  private final StoragePluginMap pluginCache;
  private final DrillSchemaFactory schemaFactory;
  private final StoragePluginStore pluginStore;

  /**
   * Cache of unnamed plugins typically resulting from table functions.
   * Ephemeral plugins timeout after some time, or some max number of
   * plugins.
   */
  private final LoadingCache<StoragePluginConfig, PluginHandle> ephemeralPlugins;

  /**
   * Set of locators which provide connector implementations.
   */
  private final List<ConnectorLocator> locators = new ArrayList<>();

  /**
   * Map of config (as deserialized from the persistent store or UI)
   * to the connector which can instantiate a connector for that config.
   */
  private final Map<Class<? extends StoragePluginConfig>, ConnectorHandle> connectors =
      new IdentityHashMap<>();

  public StoragePluginRegistryImpl(DrillbitContext context) {
    this.context = new DrillbitPluginRegistryContext(context);
    this.pluginCache = new StoragePluginMap();
    this.schemaFactory = new DrillSchemaFactory(null, this);
    locators.add(new ClassicConnectorLocator(this.context));
    locators.add(new SystemPluginLocator(this.context));
    this.pluginStore = new StoragePluginStoreImpl(context);
    this.ephemeralPlugins = CacheBuilder.newBuilder()
        .expireAfterAccess(24, TimeUnit.HOURS)
        .maximumSize(250)
        .removalListener(
            (RemovalListener<StoragePluginConfig, PluginHandle>) notification -> notification.getValue().close())
        .build(new CacheLoader<StoragePluginConfig, PluginHandle>() {
          @Override
          public PluginHandle load(StoragePluginConfig config) throws Exception {
            return createPluginEntry("$$ephemeral$$", config, PluginType.EPHEMERAL);
          }
        });
  }

  @Override
  public void init() {
    locators.stream().forEach(loc -> loc.init());
    loadIntrinsicPlugins();
    defineConnectors();
    prepareStore();
  }

  private void loadIntrinsicPlugins() {
    for (ConnectorLocator locator : locators) {
      Collection<StoragePlugin> intrinsicPlugins = locator.intrinsicPlugins();
      if (intrinsicPlugins == null) {
        continue;
      }
      for (StoragePlugin sysPlugin : intrinsicPlugins) {
        ConnectorHandle connector = ConnectorHandle.intrinsicConnector(locator, sysPlugin);
        defineConnector(connector);
        pluginCache.put(new PluginHandle(sysPlugin, connector, PluginType.INTRINSIC));
      }
    }
  }

  private void defineConnector(ConnectorHandle connector) {
    ConnectorHandle prev = connectors.put(connector.configClass(), connector);
    if (prev != null) {
      String msg = String.format("Two connectors defined for the same config: " +
          "%s -> %s and %s -> %s",
          connector.configClass().getName(), connector.locator().getClass().getName(),
          prev.configClass().getName(), prev.locator().getClass().getName());
      logger.error(msg);
      throw new IllegalStateException(msg);
    }
  }

  private void defineConnectors() {
    for (ConnectorLocator locator : locators) {
      Set<Class<? extends StoragePluginConfig>> nonIntrinsicConfigs = locator.configClasses();
      if (nonIntrinsicConfigs == null) {
        continue;
      }
      for (Class<? extends StoragePluginConfig> configClass : nonIntrinsicConfigs) {
        defineConnector(ConnectorHandle.configuredConnector(locator, configClass));
      }
    }
  }

  private void prepareStore() {
    if (loadEnabledPlugins()) {
      upgradeStore();
    } else {
      initStore();
    }
  }

  private void initStore() {
    logger.info("No storage plugin instances configured in persistent store, loading bootstrap configuration.");
    StoragePlugins bootstrapPlugins = new StoragePlugins();
    try {
      for (ConnectorLocator locator : locators) {
        StoragePlugins locatorPlugins = locator.bootstrapPlugins();
        if (locatorPlugins != null) {
          bootstrapPlugins.putAll(locatorPlugins);
        }
      }
    } catch (IOException e) {
      throw new IllegalStateException(
          "Failure initializing the plugin store. Drillbit exiting.", e);
    }
    pluginStore.putAll(bootstrapPlugins);
    locators.stream().forEach(loc -> loc.onUpgrade());
  }

  /**
   * Upgrade an existing persistent plugin config store with
   * updates available from each locator.
   */
  private void upgradeStore() {
    StoragePlugins upgraded = new StoragePlugins();
    for (ConnectorLocator locator : locators) {
      StoragePlugins locatorPlugins = locator.updatedPlugins();
      if (upgraded != null) {
        upgraded.putAll(locatorPlugins);
      }
    }
    if (upgraded.isEmpty()) {
      return;
    }
    for (Map.Entry<String, StoragePluginConfig> newPlugin : upgraded) {
      StoragePluginConfig oldPluginConfig = pluginStore.get(newPlugin.getKey());
      if (oldPluginConfig != null) {
        copyPluginStatus(oldPluginConfig, newPlugin.getValue());
      }
      pluginStore.put(newPlugin.getKey(), newPlugin.getValue());
    }
  }

  /**
   * Identifies the enabled status for new storage plugins
   * config. If this status is absent in the updater file, the status is kept
   * from the configs, which are going to be updated
   *
   * @param oldPluginConfig
   *          current storage plugin config from Persistent Store or bootstrap
   *          config file
   * @param newPluginConfig
   *          new storage plugin config
   */
  protected static void copyPluginStatus(
      StoragePluginConfig oldPluginConfig,
      StoragePluginConfig newPluginConfig) {
    if (!newPluginConfig.isEnabledStatusPresent()) {
      boolean newStatus = oldPluginConfig != null && oldPluginConfig.isEnabled();
      newPluginConfig.setEnabled(newStatus);
    }
  }

  /**
   * Initializes {@link #pluginCache} with currently enabled plugins
   * defined in the persistent store.
   *
   * @return {@code true} if the persistent store contained plugins
   * (and thus was initialized, and should perhaps be upgraded), or
   * {@code false} if no plugins were found and this this is a new store
   * which should be initialized. Avoids the need to check persistent
   * store contents twice
   */
  private boolean loadEnabledPlugins() {
    Iterator<Entry<String, StoragePluginConfig>> allPlugins = pluginStore.load();
    if (!allPlugins.hasNext()) {
      // Nothing found, this is (likely) a new install and should be
      // initialized
      return false;
    }
    while (allPlugins.hasNext()) {
      Entry<String, StoragePluginConfig> plugin = allPlugins.next();
      String name = plugin.getKey();
      StoragePluginConfig config = plugin.getValue();
      if (! config.isEnabled()) {
        continue;
      }
      try {
        pluginCache.put(createPluginEntry(name, config, PluginType.STORED));
      } catch (Exception e) {
        logger.error("Failure while setting up StoragePlugin with name: '{}', disabling.", name, e);
        config.setEnabled(false);
        pluginStore.put(name, config);
      }
    }
    // Found at least one entry, so this is an existing registry.
    return true;
  }

  @Override
  public void put(String name, StoragePluginConfig config) throws ExecutionSetupException {

    // Do not allow overwriting system plugins
    // This same check is done later. However, we want to do this check
    // before writing to the persistent store, which we must do before
    // putting the plugin into the cache (where the second check is done.)
    PluginHandle oldEntry = pluginCache.get(name);
    if (oldEntry != null && oldEntry.isIntrinsic()) {
      throw UserException.permissionError()
        .message("Attempt to replace a system plugin.")
        .addContext("Plugin name", name)
        .addContext("Intrinsic plugin class", oldEntry.config().getClass().getName())
        .addContext("Attempted replacement", config.getClass().getName())
        .build(logger);
    }

    // Write to the store first. We are now in a race to see which
    // thread will update the cache: might be us or might the another
    // thread.
    pluginStore.put(name, config);

    // Will fail on an attempt to update a system plugin. Update
    // will be rejected if another thread beats us to it.
    if (config.isEnabled()) {
      PluginHandle newHandle = restoreFromEphemeral(name, config);
      oldEntry = pluginCache.put(newHandle);
    } else {
      oldEntry = pluginCache.remove(name, config);
    }

    // Let's optimistically assume that running queries may still use
    // the old config, so transfer the possibly-created instance to the
    // ephemeral store.
    moveToEphemeral(oldEntry);
  }

  /**
   * If there is an ephemeral plugin of this (name, config), pair,
   * transfer that plugin out of ephemeral storage for reuse. Else
   * create a new handle.
   *
   * @param name plugin name
   * @param config plugin config
   * @return a handle for the plugin which may have been retrieved from
   * ephemeral storage
   */
  private PluginHandle restoreFromEphemeral(String name,
      StoragePluginConfig config) {

    // Benign race condition between check and invalidate.
    PluginHandle ephemeralEntry = ephemeralPlugins.getIfPresent(config);
    if (ephemeralEntry == null || !name.equalsIgnoreCase(ephemeralEntry.name())) {
      return createPluginEntry(name, config, PluginType.STORED);
    } else {

      // Transfer the instance to a new handle, then invalidate the
      // cache entry. The transfer ensures that the invalidate will
      // not close the plugin instance
      PluginHandle newHandle = ephemeralEntry.transfer(PluginType.STORED);
      ephemeralPlugins.invalidate(config);
      return newHandle;
    }
  }

  @Override
  public StoragePluginConfig getConfig(String name) {
    PluginHandle entry = getEntry(name);
    return entry == null ? null : entry.config();
  }

  // Gets a plugin with the named configuration
  @Override
  public StoragePlugin getPlugin(String name) throws ExecutionSetupException {
    PluginHandle entry = getEntry(name);

    // Lazy instantiation: the first call to plugin() creates the
    // actual plugin instance.
    return entry == null ? null : entry.plugin();
  }

  private PluginHandle getEntry(String name) {
    PluginHandle plugin = pluginCache.get(name);
    if (plugin != null && plugin.isIntrinsic()) {
      return plugin;
    }
    StoragePluginConfig config = pluginStore.get(name);
    if (plugin == null) {
      return refresh(name, config);
    } else {
      return refresh(plugin, config);
    }
  }

  // Lazy refresh for a plugin not known on this server.
  private PluginHandle refresh(String name, StoragePluginConfig config) {
    if (config == null || !config.isEnabled()) {
      return null;
    } else {

      // Handles race conditions: some other thread may have just done what
      // we're trying to do. Note: no need to close the new entry if
      // there is a conflict: the plugin instance is created on demand
      // and we've not done so.
      return pluginCache.putIfAbsent(restoreFromEphemeral(name, config));
    }
  }

  // Lazy refresh of a plugin we think we know about.
  private PluginHandle refresh(PluginHandle entry, StoragePluginConfig config) {

    // Deleted or disabled in persistent storage?
    if (config == null || !config.isEnabled()) {

      // Move the old config to the ephemeral store.
      if (pluginCache.remove(entry)) {
        moveToEphemeral(entry);
      }
      return null;
    }
    // Unchanged?
    if (entry.config().equals(config)) {
      return entry;
    }

    // Plugin changed. Handle race condition on replacement.
    PluginHandle newEntry = restoreFromEphemeral(entry.name(), config);
    if (pluginCache.replace(entry, newEntry)) {
      moveToEphemeral(entry);
      return newEntry;
    } else {
      return pluginCache.get(entry.name());
    }
  }

  private void refresh() {
    // Iterate through the plugin instances in the persistent store adding
    // any new ones and refreshing those whose configuration has changed
    Iterator<Entry<String, StoragePluginConfig>> allPlugins = pluginStore.load();
    while (allPlugins.hasNext()) {
      Entry<String, StoragePluginConfig> plugin = allPlugins.next();
      refresh(plugin.getKey(), plugin.getValue());
    }
  }

  @Override
  public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException {
    // Try to lookup plugin by configuration
    PluginHandle plugin = pluginCache.get(config);
    if (plugin != null) {
      return plugin.plugin();
    }

    // No named plugin matches the desired configuration, let's create an
    // ephemeral storage plugin (or get one from the cache)
    try {
      return ephemeralPlugins.get(config).plugin();
    } catch (ExecutionException e) {
      Throwable cause = e.getCause();
      if (cause instanceof ExecutionSetupException) {
        throw (ExecutionSetupException) cause;
      } else {
        // this shouldn't happen. here for completeness.
        throw new ExecutionSetupException(
            "Failure while trying to create ephemeral plugin.", cause);
      }
    }
  }

  // This method is not thread-safe: there is no guarantee that the plugin
  // deleted is the same one the user requested: someone else could have deleted
  // the old one and added a new one of the same name.
  // TODO: Fix this
  @Override
  public void remove(String name) {
    PluginHandle entry  = pluginCache.remove(name);
    if (entry != null) {
      moveToEphemeral(entry);
    }

    // Must tell store to delete even if not known locally because
    // the store might hold a disabled version
    pluginStore.delete(name);
  }

  private void moveToEphemeral(PluginHandle handle) {
    if (handle == null) {
      return;
    }

    // If already in the ephemeral store, don't replace.
    // Race condition is benign: two threads both doing the put
    // will cause the first handle to be closed when the second hits.
    if (ephemeralPlugins.getIfPresent(handle.config()) == null) {
      ephemeralPlugins.put(handle.config(), handle.transfer(PluginType.EPHEMERAL));
    } else {
      handle.close();
    }
  }

  @Override
  public Map<String, StoragePluginConfig> storedConfigs() {
    Map<String, StoragePluginConfig> result = new HashMap<>();
    Iterator<Entry<String, StoragePluginConfig>> allPlugins = pluginStore.load();
    while (allPlugins.hasNext()) {
      Entry<String, StoragePluginConfig> plugin = allPlugins.next();
      result.put(plugin.getKey(), plugin.getValue());
    }
    return result;
  }

  @Override
  public Map<String, StoragePluginConfig> enabledConfigs() {
    refresh();
    Map<String, StoragePluginConfig> result = new HashMap<>();
    for (PluginHandle entry : pluginCache) {
      if (entry.isStored()) {
        result.put(entry.name(), entry.config());
      }
    }
    return result;
  }

  @Override
  public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig) throws ExecutionSetupException {
    StoragePlugin storagePlugin = getPlugin(storageConfig);
    return storagePlugin.getFormatPlugin(formatConfig);
  }

  @Override
  public SchemaFactory getSchemaFactory() {
    return schemaFactory;
  }

  // TODO: Remove this: it will force plugins to be instantiated
  // unnecessarily
  private static class PluginIterator implements Iterator<Entry<String, StoragePlugin>> {
    private final Iterator<PluginHandle> base;

    public PluginIterator(Iterator<PluginHandle> base) {
      this.base = base;
    }

    @Override
    public boolean hasNext() {
      return base.hasNext();
    }

    @Override
    public Entry<String, StoragePlugin> next() {
      PluginHandle entry = base.next();
      return new ImmutableEntry<>(entry.name(), entry.plugin());
    }
  }

  @Override
  public Iterator<Entry<String, StoragePlugin>> iterator() {
    return new PluginIterator(pluginCache.iterator());
  }

  @Override
  public synchronized void close() throws Exception {
    ephemeralPlugins.invalidateAll();
    pluginCache.close();
    pluginStore.close();
    locators.stream().forEach(loc -> loc.close());
  }

  /**
   * Creates plugin entry with the given {@code name} and configuration {@code pluginConfig}.
   * Validation for existence, disabled, etc. should have been done by the caller.
   * <p>
   * Uses the config to find the connector, then lets the connector create the plugin
   * entry. Creation of the plugin instance is deferred until first requested.
   * This should speed up Drillbit start, as long as other code only asks for the
   * plugin instance when it is actually needed to plan or execute a query (not just
   * to provide a schema.)
   *
   * @param name name of the plugin
   * @param pluginConfig plugin configuration
   * @return handle the the plugin with metadata and deferred access to
   * the plugin instance
   */
  private PluginHandle createPluginEntry(String name, StoragePluginConfig pluginConfig, PluginType type) {
    ConnectorHandle connector = connectors.get(pluginConfig.getClass());
    if (connector == null) {
      throw UserException.internalError()
        .message("No connector known for plugin configuration")
        .addContext("Plugin name", name)
        .addContext("Config class", pluginConfig.getClass().getName())
        .build(logger);
    }
    return connector.pluginEntryFor(name, pluginConfig, type);
  }

  // TODO: Replace this. Inefficient to obtain schemas we don't need.
  protected void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
    // Refresh against the persistent store.
    // TODO: This will hammer the system if queries come in rapidly.
    // Need some better solution: grace period, alert from ZK that there
    // is something new, etc. Even better, don't register all the schemas.
    refresh();

    // Register schemas with the refreshed plugins
    // TODO: this code requires instantiating all plugins, even though
    // the query won't use them. Need a way to do deferred registration.
    for (PluginHandle plugin : pluginCache.plugins()) {
      try {
        plugin.plugin().registerSchemas(schemaConfig, parent);
      } catch (Exception e) {
        logger.warn("Error during `{}` schema initialization: {}", plugin.name(), e.getMessage(), e.getCause());
      }
    }
  }
}
