/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.compile;

import java.util.List;

import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.server.options.OptionSet;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;

/**
 * Global code compiler mechanism shared by all threads and operators.
 * Holds a single cache of generated code (keyed by code source) to
 * prevent compiling identical code multiple times. Supports both
 * the byte-code merging and plain-old Java methods of code
 * generation and compilation.
 */

public class CodeCompiler {
  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CodeCompiler.class);

  /**
   * Abstracts out the details of compiling code using the two available
   * mechanisms. Allows this mechanism to be unit tested separately from
   * the code cache.
   */

  public static class CodeGenCompiler {
    private final ClassTransformer transformer;
    private final ClassBuilder classBuilder;

    public CodeGenCompiler(final DrillConfig config, final OptionSet optionManager) {
      transformer = new ClassTransformer(config, optionManager);
      classBuilder = new ClassBuilder(config, optionManager);
    }

    /**
     * Compile the code already generated by the code generator.
     *
     * @param cg the code generator for the class
     * @return the compiled class
     * @throws Exception if anything goes wrong
     */

    public Class<?> compile(final CodeGenerator<?> cg) throws Exception {
       if (cg.isPlainJava()) {

        // Generate class as plain-old Java

        logger.trace(String.format("Class %s generated as plain Java", cg.getClassName()));
        return classBuilder.getImplementationClass(cg);
      } else {

        // Generate class parts and assemble byte-codes.

        logger.trace(String.format("Class %s generated via byte-code manipulation", cg.getClassName()));
        return transformer.getImplementationClass(cg);
      }
    }

    /**
     * Generate code for the code generator, then compile it.
     *
     * @param cg the code generator for the class
     * @return the compiled class
     * @throws Exception if anything goes wrong
     */

    public Class<?> generateAndCompile(final CodeGenerator<?> cg) throws Exception {
      cg.generate();
      return compile(cg);
    }
  }

  public static final String COMPILE_BASE = "drill.exec.compile";

  /**
   * Maximum size of the compiled class cache.
   */

  public static final String MAX_LOADING_CACHE_SIZE_CONFIG = COMPILE_BASE + ".cache_max_size";

  /**
   * Disables the code cache. Primarily for testing.
   */

  public static final String DISABLE_CACHE_CONFIG = COMPILE_BASE + ".disable_cache";

  /**
   * Enables saving generated code for debugging
   */
  public static final String ENABLE_SAVE_CODE_FOR_DEBUG_TOPN = COMPILE_BASE + ".codegen.debug.topn";

  /**
   * Prefer to generate code as plain Java when the code generator
   * supports that mechanism.
   */

  public static final String PREFER_POJ_CONFIG = CodeCompiler.COMPILE_BASE + ".prefer_plain_java";

  private final CodeGenCompiler codeGenCompiler;
  private final boolean useCache;

  // Metrics

  private int classGenCount;
  private int cacheMissCount;

  /**
   * Google Guava loading cache that defers creating a cache
   * entry until first needed. Creation is done in a thread-safe
   * way: if two threads try to create the same class at the same
   * time, the first does the work, the second waits for the first
   * to complete, then grabs the new entry.
   */

  private final LoadingCache<CodeGenerator<?>, GeneratedClassEntry> cache;
  private final boolean preferPlainJava;

  public CodeCompiler(final DrillConfig config, final OptionSet optionManager) {
    codeGenCompiler = new CodeGenCompiler(config, optionManager);
    useCache = ! config.getBoolean(DISABLE_CACHE_CONFIG);
    cache = CacheBuilder.newBuilder()
        .maximumSize(config.getInt(MAX_LOADING_CACHE_SIZE_CONFIG))
        .build(new Loader());
    preferPlainJava = config.getBoolean(PREFER_POJ_CONFIG);
    logger.info(String.format("Plain java code generation preferred: %b", preferPlainJava));
  }

  /**
   * Create a single instance of the generated class.
   *
   * @param cg code generator for the class to be instantiated.
   * @return an instance of the generated class
   * @throws ClassTransformationException general "something is wrong" exception
   * for the Drill compilation chain.
   */

  @SuppressWarnings("unchecked")
  public <T> T createInstance(final CodeGenerator<?> cg) throws ClassTransformationException {
    return (T) createInstances(cg, 1).get(0);
  }

  /**
   * Create multiple instances of the generated class.
   *
   * @param cg code generator for the class to be instantiated.
   * @param count the number of instances desired.
   * @return a list of instances of the generated class.
   * @throws ClassTransformationException general "something is wrong" exception
   * for the Drill compilation chain.
   */

  @SuppressWarnings("unchecked")
  public <T> List<T> createInstances(final CodeGenerator<?> cg, int count) throws ClassTransformationException {
    if (preferPlainJava && cg.supportsPlainJava()) {
      cg.preferPlainJava(true);
    }
    cg.generate();
    classGenCount++;
    try {
      final GeneratedClassEntry ce;
      if (useCache) {
        ce = cache.get(cg);
        logger.trace(String.format("Class %s found in code cache", cg.getClassName()));
      } else {
        ce = makeClass(cg);
      }
      List<T> tList = Lists.newArrayList();
      for (int i = 0; i < count; i++) {
        tList.add((T) ce.clazz.newInstance());
      }
      return tList;
    } catch (Exception e) {
      throw new ClassTransformationException(e);
    }
  }

  /**
   * Loader used to create an entry in the class cache when the entry
   * does not yet exist. Here, we generate the code, compile it,
   * and place the resulting class into the cache. The class has an
   * associated class loader which "dangles" from the class itself;
   * we don't keep track of the class loader itself.
   */

  private class Loader extends CacheLoader<CodeGenerator<?>, GeneratedClassEntry> {
    @Override
    public GeneratedClassEntry load(final CodeGenerator<?> cg) throws Exception {
      return makeClass(cg);
    }
  }

  /**
   * Called when the requested class does not exist in the cache and should
   * be compiled using the preferred code generation technique.
   *
   * @param cg the code generator for the class
   * @return a cache entry for the class. The entry holds the class and the
   * class holds onto its class loader (that is used to load any nested classes).
   * @throws Exception if anything goes wrong with compilation or byte-code
   * merge
   */

  private GeneratedClassEntry makeClass(final CodeGenerator<?> cg) throws Exception {
    cacheMissCount++;
    return new GeneratedClassEntry(codeGenCompiler.compile(cg));
  }

  private class GeneratedClassEntry {
    private final Class<?> clazz;

    public GeneratedClassEntry(final Class<?> clazz) {
      this.clazz = clazz;
    }
  }

  /**
   * Flush the compiled classes from the cache.
   *
   * <p>The cache has DrillbitContext lifetime, so the only way items go out of it
   * now is by being aged out because of the maximum cache size.
   *
   * <p>The intent of flushCache() is to make it possible to flush the cache for
   * testing purposes, although this could be used by users in case issues arise. If
   * that happens, remove the visible for testing annotation.
   */
  @VisibleForTesting
  public void flushCache() {
    cache.invalidateAll();
  }

  /**
   * Upon close, report the effectiveness of the code cache to the log.
   */

  public void close() {
    int hitRate = 0;
    if (classGenCount > 0) {
      hitRate = (int) Math.round((classGenCount - cacheMissCount) * 100.0 / classGenCount);
    }
    logger.info(String.format("Stats: code gen count: %d, cache miss count: %d, hit rate: %d%%",
                classGenCount, cacheMissCount, hitRate));
  }
}
