// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.impala.service;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

import org.apache.impala.analysis.BoolLiteral;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.SlotRef;
import org.apache.impala.analysis.TableName;
import org.apache.impala.common.InternalException;
import org.apache.impala.common.Pair;
import org.apache.impala.thrift.TCacheJarParams;
import org.apache.impala.thrift.TCacheJarResult;
import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TCatalogServiceRequestHeader;
import org.apache.impala.thrift.TColumnValue;
import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TExprBatch;
import org.apache.impala.thrift.TGetPartitionStatsRequest;
import org.apache.impala.thrift.TGetPartitionStatsResponse;
import org.apache.impala.thrift.TParseDateStringResult;
import org.apache.impala.thrift.TPrioritizeLoadRequest;
import org.apache.impala.thrift.TPrioritizeLoadResponse;
import org.apache.impala.thrift.TQueryCtx;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TResultRow;
import org.apache.impala.thrift.TSymbolLookupParams;
import org.apache.impala.thrift.TSymbolLookupResult;
import org.apache.impala.thrift.TTable;
import org.apache.impala.util.NativeLibUtil;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;

/**
 * This class provides the Impala executor functionality to the FE.
 * fe-support.cc implements all the native calls.
 * If the planner is executed inside Impalad, Impalad would have registered all the JNI
 * native functions already. There's no need to load the shared library.
 * For unit test (mvn test), load the shared library because the native function has not
 * been loaded yet.
 */
public class FeSupport {
  private final static Logger LOG = LoggerFactory.getLogger(FeSupport.class);
  private static boolean loaded_ = false;

  // Only called if this library is explicitly loaded. This only happens
  // when running FE tests.
  public native static void NativeFeTestInit();

  // Returns a serialized TResultRow
  public native static byte[] NativeEvalExprsWithoutRow(
      byte[] thriftExprBatch, byte[] thriftQueryGlobals, long maxResultSize);

  // Returns a serialized TSymbolLookupResult
  public native static byte[] NativeLookupSymbol(byte[] thriftSymbolLookup);

  // Returns a serialized TCacheJarResult
  public native static byte[] NativeCacheJar(byte[] thriftCacheJar);

  // Adds a topic item to the backend's pending metadata-topic update.
  // 'serializationBuffer' is a serialized TCatalogObject.
  // The return value is true if the operation succeeds and false otherwise.
  public native static boolean NativeAddPendingTopicItem(long nativeCatalogServerPtr,
      String key, long version, byte[] serializationBuffer, boolean deleted);

  // Get a catalog object update from the backend. A pair of isDeletion flag and
  // serialized TCatalogObject is returned.
  public native static Pair<Boolean, ByteBuffer> NativeGetNextCatalogObjectUpdate(
      long nativeIteratorPtr);

  // The return value is true if the operation succeeds and false otherwise.
  public native static boolean NativeLibCacheSetNeedsRefresh(String hdfsLocation);
  public native static boolean NativeLibCacheRemoveEntry(String hdfsLibFile);

  // The two methods below make RPCs to the Catalog Server. To keep our kerberos
  // configuration consolidated, we make make all RPCs in the BE layer instead of calling
  // the Catalog Server using Java Thrift bindings.

  // Does an RPC to the Catalog Server to prioritize the metadata loading of a
  // one or more catalog objects.
  public native static byte[] NativePrioritizeLoad(byte[] thriftReq);

  public native static byte[] NativeGetPartialCatalogObject(byte[] thriftReq)
      throws InternalException;

  // Does an RPC to the Catalog Server to fetch specified table partition statistics.
  public native static byte[] NativeGetPartitionStats(byte[] thriftReq);

  public native static byte[] NativeUpdateTableUsage(byte[] thriftReq);

  // Does an RPC to the Catalog Server to check if the given user is a Sentry admin.
  public native static byte[] NativeSentryAdminCheck(byte[] thriftReq);

  // Parses a string of comma-separated key=value query options ('csvQueryOptions'),
  // updates the existing query options ('queryOptions') with them and returns the
  // resulting serialized TQueryOptions object.
  // A note about the function's interface: ideally we wouldn't have to pass in the
  // existing query options. We could just return the newly set query options to the
  // caller and let the caller update the existing query options with the new ones.
  // Unfortunately due to a bug in the thrift-generated TQueryOptions class, in some cases
  // it is impossible to figure out whether a query option has been set explicitly or left
  // at its default setting, therefore this approach would not work.
  public native static byte[] NativeParseQueryOptions(String csvQueryOptions,
      byte[] queryOptions);

  public native static int MinLogSpaceForBloomFilter(long ndv, double fpp);

  // Parses date string, verifies if it is valid and returns the resulting
  // TParseDateStringResult object. Different date string variations are accepted.
  // E.g.: '2011-01-01', '2011-01-1', '2011-1-01', '2011-01-01'.
  public native static byte[] nativeParseDateString(String date);

  /**
   * Locally caches the jar at the specified HDFS location.
   *
   * @param hdfsLocation The path to the jar in HDFS
   * @return The result of the call to cache the jar, includes a status and the local
   *         path of the cached jar if the operation was successful.
   */
  public static TCacheJarResult CacheJar(String hdfsLocation) throws InternalException {
    Preconditions.checkNotNull(hdfsLocation);
    TCacheJarParams params = new TCacheJarParams(hdfsLocation);
    TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
    byte[] result;
    try {
      result = CacheJar(serializer.serialize(params));
      Preconditions.checkNotNull(result);
      TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
      TCacheJarResult thriftResult = new TCacheJarResult();
      deserializer.deserialize(thriftResult, result);
      return thriftResult;
    } catch (TException e) {
      // this should never happen
      throw new InternalException(
          "Couldn't cache jar at HDFS location " + hdfsLocation, e);
    }
  }

  private static byte[] CacheJar(byte[] thriftParams) {
    try {
      return NativeCacheJar(thriftParams);
    } catch (UnsatisfiedLinkError e) {
      loadLibrary();
    }
    return NativeCacheJar(thriftParams);
  }

  /**
   * If it is known that the size of the evaluated expression is fixed, e.g.,
   * the size of an integer, then this method will be called to perform the evaluation.
   * Otherwise, the method EvalExprWithoutRowBounded that takes an additional argument
   * specifying the upper bound on the size of evaluated expression should be invoked.
   */
  public static TColumnValue EvalExprWithoutRow(Expr expr, TQueryCtx queryCtx)
    throws InternalException {
    return EvalExprWithoutRowBounded(expr, queryCtx, 0);
  }

  public static TColumnValue EvalExprWithoutRowBounded(Expr expr, TQueryCtx queryCtx,
    int maxResultSize) throws InternalException {
    Preconditions.checkState(!expr.contains(SlotRef.class));
    TExprBatch exprBatch = new TExprBatch();
    exprBatch.addToExprs(expr.treeToThrift());
    TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
    byte[] result;
    try {
      result = EvalExprsWithoutRowBounded(
          serializer.serialize(exprBatch), serializer.serialize(queryCtx), maxResultSize);
      Preconditions.checkNotNull(result);
      TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
      TResultRow val = new TResultRow();
      deserializer.deserialize(val, result);
      if (val.getColValsSize() != 1) {
        throw new IllegalStateException(String.format("Illegal expr eval result. " +
            "Expr=%s\nTExpBatch=%s\nResult=%s", expr.toSql(), exprBatch, val));
      }
      return val.getColVals().get(0);
    } catch (TException e) {
      // this should never happen
      throw new InternalException("couldn't execute expr " + expr.toSql(), e);
    }
  }

  private static byte[] LookupSymbol(byte[] thriftParams) {
    try {
      return NativeLookupSymbol(thriftParams);
    } catch (UnsatisfiedLinkError e) {
      loadLibrary();
    }
    return NativeLookupSymbol(thriftParams);
  }

  public static TSymbolLookupResult LookupSymbol(TSymbolLookupParams params)
      throws InternalException {
    TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
    try {
      byte[] resultBytes = LookupSymbol(serializer.serialize(params));
      Preconditions.checkNotNull(resultBytes);
      TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
      TSymbolLookupResult result = new TSymbolLookupResult();
      deserializer.deserialize(result, resultBytes);
      return result;
    } catch (TException e) {
      // this should never happen
      throw new InternalException("couldn't perform symbol lookup.", e);
    }
  }

  /**
   * If it is known that the size of the evaluated expression is fixed, e.g.,
   * the size of an integer, then this method will be called to perform the evaluation.
   * Otherwise, the method EvalExprsWithoutRowBounded that takes an additional argument
   * specifying the upper bound on the size of rewritten expression should be invoked.
   */
  private static byte[] EvalExprsWithoutRow(
      byte[] thriftExprBatch, byte[] thriftQueryContext) {
    return EvalExprsWithoutRowBounded(thriftExprBatch, thriftQueryContext, 0);
  }

  private static byte[] EvalExprsWithoutRowBounded(
      byte[] thriftExprBatch, byte[] thriftQueryContext, int maxResultSize) {
    try {
      return NativeEvalExprsWithoutRow(thriftExprBatch, thriftQueryContext,
        maxResultSize);
    } catch (UnsatisfiedLinkError e) {
      loadLibrary();
    }
    return NativeEvalExprsWithoutRow(thriftExprBatch, thriftQueryContext, maxResultSize);
  }

  public static boolean EvalPredicate(Expr pred, TQueryCtx queryCtx)
      throws InternalException {
    // Shortcuts to avoid expensive BE evaluation.
    if (pred instanceof BoolLiteral) return ((BoolLiteral) pred).getValue();
    if (Expr.IS_NULL_LITERAL.apply(pred)) return false;
    Preconditions.checkState(pred.getType().isBoolean());
    TColumnValue val = EvalExprWithoutRow(pred, queryCtx);
    // Return false if pred evaluated to false or NULL. True otherwise.
    return val.isBool_val() && val.bool_val;
  }

  /**
   * Evaluate a batch of predicates in the BE. The results are stored in a
   * TResultRow object, where each TColumnValue in it stores the result of
   * a predicate evaluation.
   *
   * TODO: This function is currently used for improving the performance of
   * partition pruning (see IMPALA-887), hence it only supports boolean
   * exprs. In the future, we can extend it to support arbitrary exprs without
   * SlotRefs.
   */
  public static TResultRow EvalPredicateBatch(List<Expr> exprs,
      TQueryCtx queryCtx) throws InternalException {
    TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
    TExprBatch exprBatch = new TExprBatch();
    for (Expr expr: exprs) {
      // Make sure we only process boolean exprs.
      Preconditions.checkState(expr.getType().isBoolean());
      Preconditions.checkState(!expr.contains(SlotRef.class));
      exprBatch.addToExprs(expr.treeToThrift());
    }
    byte[] result;
    try {
      result = EvalExprsWithoutRow(
          serializer.serialize(exprBatch), serializer.serialize(queryCtx));
      Preconditions.checkNotNull(result);
      TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
      TResultRow val = new TResultRow();
      deserializer.deserialize(val, result);
      return val;
    } catch (TException e) {
      // this should never happen
      throw new InternalException("couldn't execute a batch of exprs.", e);
    }
  }

  private static byte[] PrioritizeLoad(byte[] thriftReq) {
    try {
      return NativePrioritizeLoad(thriftReq);
    } catch (UnsatisfiedLinkError e) {
      loadLibrary();
    }
    return NativePrioritizeLoad(thriftReq);
  }

  public static void PrioritizeLoad(Set<TableName> tableNames)
      throws InternalException {
    Preconditions.checkNotNull(tableNames);

    LOG.info(String.format("Requesting prioritized load of table(s): %s",
        Joiner.on(", ").join(tableNames)));

    List<TCatalogObject> objectDescs = new ArrayList<TCatalogObject>(tableNames.size());
    for (TableName tableName: tableNames) {
      TCatalogObject catalogObject = new TCatalogObject();
      catalogObject.setType(TCatalogObjectType.TABLE);
      catalogObject.setTable(new TTable(tableName.getDb(), tableName.getTbl()));
      objectDescs.add(catalogObject);
    }

    TPrioritizeLoadRequest request = new TPrioritizeLoadRequest ();
    request.setHeader(new TCatalogServiceRequestHeader());
    request.setObject_descs(objectDescs);

    TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
    try {
      byte[] result = PrioritizeLoad(serializer.serialize(request));
      Preconditions.checkNotNull(result);
      TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
      TPrioritizeLoadResponse response = new TPrioritizeLoadResponse();
      deserializer.deserialize(response, result);
      if (response.getStatus().getStatus_code() != TErrorCode.OK) {
        throw new InternalException("Error requesting prioritized load: " +
            Joiner.on("\n").join(response.getStatus().getError_msgs()));
      }
    } catch (TException e) {
      // this should never happen
      throw new InternalException("Error processing request: " + e.getMessage(), e);
    }
  }

  private static byte[] GetPartitionStats(byte[] thriftReq) {
    try {
      return NativeGetPartitionStats(thriftReq);
    } catch (UnsatisfiedLinkError e) {
      loadLibrary();
    }
    return NativeGetPartitionStats(thriftReq);
  }

  public static TGetPartitionStatsResponse GetPartitionStats(
      TableName table) throws InternalException {
    Preconditions.checkNotNull(table);

    LOG.info("Fetching partition statistics for table {} from catalog.", table);

    TGetPartitionStatsRequest request = new TGetPartitionStatsRequest();
    request.setTable_name(table.toThrift());
    TGetPartitionStatsResponse response = new TGetPartitionStatsResponse();
    TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
    try {
      byte[] result = GetPartitionStats(serializer.serialize(request));
      Preconditions.checkNotNull(result);
      TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());

      deserializer.deserialize(response, result);
      if (response.getStatus().getStatus_code() != TErrorCode.OK) {
        throw new InternalException("Error requesting GetPartitionStats: "
            + Joiner.on("\n").join(response.getStatus().getError_msgs()));
      }
    } catch (TException e) {
      // this should never happen
      throw new InternalException("Error processing request: " + e.getMessage(), e);
    }
    int numPartitionsFetched = 0;
    if (response.isSetPartition_stats()) {
      numPartitionsFetched = response.partition_stats.size();
    }
    LOG.info("Fetched statistics for {} partitions of table {}.", numPartitionsFetched,
        table);
    return response;
  }

  private static byte[] ParseQueryOptions(String csvQueryOptions, byte[] queryOptions) {
    try {
      return NativeParseQueryOptions(csvQueryOptions, queryOptions);
    } catch (UnsatisfiedLinkError e) {
      loadLibrary();
    }
    return NativeParseQueryOptions(csvQueryOptions, queryOptions);
  }

  /**
   * Parses a string of comma-separated key=value query options. Returns a TQueryOptions
   * object that contains the updated query options.
   */
  public static TQueryOptions ParseQueryOptions(String csvQueryOptions,
      TQueryOptions queryOptions) throws InternalException {
    Preconditions.checkNotNull(csvQueryOptions);
    Preconditions.checkNotNull(queryOptions);

    TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
    try {
      byte[] result = ParseQueryOptions(csvQueryOptions,
          serializer.serialize(queryOptions));
      Preconditions.checkNotNull(result);
      TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
      TQueryOptions updatedQueryOptions = new TQueryOptions();
      deserializer.deserialize(updatedQueryOptions, result);
      return updatedQueryOptions;
    } catch (TException e) {
      throw new InternalException("Could not parse query options: " + e.getMessage(), e);
    }
  }

  /**
   * Returns the log (base 2) of the minimum number of bytes we need for a Bloom
   * filter with 'ndv' unique elements and a false positive probability of less
   * than 'fpp'.
   */
  public static int GetMinLogSpaceForBloomFilter(long ndv, double fpp) {
    try {
      return MinLogSpaceForBloomFilter(ndv, fpp);
    } catch (UnsatisfiedLinkError e) {
      loadLibrary();
    }
    return MinLogSpaceForBloomFilter(ndv, fpp);
  }

  public static byte[] GetPartialCatalogObject(byte[] thriftReq)
      throws InternalException {
    try {
      return NativeGetPartialCatalogObject(thriftReq);
    } catch (UnsatisfiedLinkError e) {
      loadLibrary();
    }
    return NativeGetPartialCatalogObject(thriftReq);
  }

  public static byte[] CheckSentryAdmin(byte[] thriftReq) {
    try {
      return NativeSentryAdminCheck(thriftReq);
    } catch (UnsatisfiedLinkError e) {
      loadLibrary();
    }
    return NativeSentryAdminCheck(thriftReq);
  }

  private static byte[] parseDateStringUtil(String date) {
    try {
      return nativeParseDateString(date);
    } catch (UnsatisfiedLinkError e) {
      loadLibrary();
    }
    return nativeParseDateString(date);
  }

  /**
   * Parses date string, verifies if it is valid and returns the resulting
   * TParseDateStringResult object. Different date string variations are accepted.
   * E.g.: '2011-01-01', '2011-01-1', '2011-1-01', '2011-01-01'.
   */
  public static TParseDateStringResult parseDateString(String date)
      throws InternalException {
    Preconditions.checkNotNull(date);
    try {
      byte[] result = parseDateStringUtil(date);
      Preconditions.checkNotNull(result);
      TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
      TParseDateStringResult res = new TParseDateStringResult();
      deserializer.deserialize(res, result);
      return res;
    } catch (TException e) {
      throw new InternalException("Could not parse date string: " + e.getMessage(), e);
    }
  }

  /**
   * This function should be called explicitly by the FeSupport to ensure that
   * native functions are loaded. Tests that depend on JniCatalog or JniFrontend
   * being instantiated should also call this function.
   */
  public static synchronized void loadLibrary() {
    if (loaded_) return;
    LOG.info("Loading libfesupport.so");
    NativeLibUtil.loadLibrary("libfesupport.so");
    LOG.info("Loaded libfesupport.so");
    loaded_ = true;
    NativeFeTestInit();
  }
}
