// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.impala.planner;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;

import org.apache.impala.analysis.AggregateInfo;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.BinaryPredicate;
import org.apache.impala.analysis.BoolLiteral;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.ExprSubstitutionMap;
import org.apache.impala.analysis.FunctionCallExpr;
import org.apache.impala.analysis.FunctionName;
import org.apache.impala.analysis.FunctionParams;
import org.apache.impala.analysis.InPredicate;
import org.apache.impala.analysis.IsNullPredicate;
import org.apache.impala.analysis.LiteralExpr;
import org.apache.impala.analysis.NumericLiteral;
import org.apache.impala.analysis.SlotDescriptor;
import org.apache.impala.analysis.SlotRef;
import org.apache.impala.analysis.StringLiteral;
import org.apache.impala.analysis.TupleDescriptor;
import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.HdfsFileFormat;
import org.apache.impala.catalog.KuduColumn;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TKuduScanNode;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TPlanNode;
import org.apache.impala.thrift.TPlanNodeType;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TScanRange;
import org.apache.impala.thrift.TScanRangeLocation;
import org.apache.impala.thrift.TScanRangeLocationList;
import org.apache.impala.thrift.TScanRangeSpec;
import org.apache.impala.util.KuduUtil;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduPredicate;
import org.apache.kudu.client.KuduPredicate.ComparisonOp;
import org.apache.kudu.client.KuduScanToken;
import org.apache.kudu.client.KuduScanToken.KuduScanTokenBuilder;
import org.apache.kudu.client.LocatedTablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;

/**
 * Scan of a single Kudu table.
 *
 * Extracts predicates that can be pushed down to Kudu. Currently only binary predicates
 * that have a constant expression on one side and a slot ref on the other can be
 * evaluated by Kudu.
 *
 * Uses the Kudu ScanToken API to generate a set of Kudu "scan tokens" which are used for
 * scheduling and initializing the scanners. Scan tokens are opaque objects that represent
 * a scan for some Kudu data on a tablet (currently one token represents one tablet), and
 * it contains the tablet locations and all information needed to produce a Kudu scanner,
 * including the projected columns and predicates that are pushed down.
 *
 * After KUDU-1065 is resolved, Kudu will also prune the tablets that don't need to be
 * scanned, and only the tokens for those tablets will be returned.
 */
public class KuduScanNode extends ScanNode {
  private final static Logger LOG = LoggerFactory.getLogger(KuduScanNode.class);

  private final FeKuduTable kuduTable_;

  // True if this scan node should use the MT implementation in the backend.
  // Set in computeNodeResourceProfile().
  private boolean useMtScanNode_;

  // Indexes for the set of hosts that will be used for the query.
  // From analyzer.getHostIndex().getIndex(address)
  private final Set<Integer> hostIndexSet_ = new HashSet<>();

  // List of conjuncts that can be pushed down to Kudu. Used for computing stats and
  // explain strings.
  private final List<Expr> kuduConjuncts_ = new ArrayList<>();

  // Exprs in kuduConjuncts_ converted to KuduPredicates.
  private final List<KuduPredicate> kuduPredicates_ = new ArrayList<>();

  // Slot that is used to record the Kudu metadata for the count(*) aggregation if
  // this scan node has the count(*) optimization enabled.
  private SlotDescriptor countStarSlot_ = null;

  public KuduScanNode(PlanNodeId id, TupleDescriptor desc, List<Expr> conjuncts,
      AggregateInfo aggInfo) {
    super(id, desc, "SCAN KUDU");
    kuduTable_ = (FeKuduTable) desc_.getTable();
    conjuncts_ = conjuncts;
    aggInfo_ = aggInfo;
  }

  public ExprSubstitutionMap getOptimizedAggSmap() { return optimizedAggSmap_; }

  @Override
  public void init(Analyzer analyzer) throws ImpalaRuntimeException {
    conjuncts_ = orderConjunctsByCost(conjuncts_);

    KuduClient client = KuduUtil.getKuduClient(kuduTable_.getKuduMasterHosts());
    try {
      org.apache.kudu.client.KuduTable rpcTable =
          client.openTable(kuduTable_.getKuduTableName());
      validateSchema(rpcTable);

      if (canApplyCountStarOptimization(analyzer)) {
        Preconditions.checkState(desc_.getPath().destTable() != null);
        Preconditions.checkState(kuduConjuncts_.isEmpty());
        countStarSlot_ = applyCountStarOptimization(analyzer);
      }

      // Extract predicates that can be evaluated by Kudu.
      extractKuduConjuncts(analyzer, client, rpcTable);

      // Materialize the slots of the remaining conjuncts (i.e. those not pushed to Kudu)
      analyzer.materializeSlots(conjuncts_);

      // Compute mem layout before the scan range locations because creation of the Kudu
      // scan tokens depends on having a mem layout.
      computeMemLayout(analyzer);

      // Creates Kudu scan tokens and sets the scan range locations.
      computeScanRangeLocations(analyzer, client, rpcTable);
    } catch (Exception e) {
      throw new ImpalaRuntimeException("Unable to initialize the Kudu scan node", e);
    }
    computeStats(analyzer);
  }

  /**
   * Validate the columns Impala expects are actually in the Kudu table.
   */
  private void validateSchema(org.apache.kudu.client.KuduTable rpcTable)
      throws ImpalaRuntimeException {
    Schema tableSchema = rpcTable.getSchema();
    for (SlotDescriptor desc: getTupleDesc().getSlots()) {
      if (!desc.isScanSlot()) continue;
      String colName = ((KuduColumn) desc.getColumn()).getKuduName();
      Type colType = desc.getColumn().getType();
      ColumnSchema kuduCol = null;
      try {
        kuduCol = tableSchema.getColumn(colName);
      } catch (Exception e) {
        throw new ImpalaRuntimeException("Column '" + colName + "' not found in kudu " +
            "table " + rpcTable.getName() + ". The table metadata in Impala may be " +
            "outdated and need to be refreshed.");
      }

      Type kuduColType =
          KuduUtil.toImpalaType(kuduCol.getType(), kuduCol.getTypeAttributes());
      if (!colType.equals(kuduColType)) {
        throw new ImpalaRuntimeException("Column '" + colName + "' is type " +
            kuduColType.toSql() + " but Impala expected " + colType.toSql() +
            ". The table metadata in Impala may be outdated and need to be refreshed.");
      }

      if (desc.getIsNullable() != kuduCol.isNullable()) {
        String expected;
        String actual;
        if (desc.getIsNullable()) {
          expected = "nullable";
          actual = "not nullable";
        } else {
          expected = "not nullable";
          actual = "nullable";
        }
        throw new ImpalaRuntimeException("Column '" + colName + "' is " + actual +
            " but Impala expected it to be " + expected +
            ". The table metadata in Impala may be outdated and need to be refreshed.");
      }
    }
  }

  /**
   * Compute the scan range locations for the given table using the scan tokens.
   */
  private void computeScanRangeLocations(Analyzer analyzer,
      KuduClient client, org.apache.kudu.client.KuduTable rpcTable)
      throws ImpalaRuntimeException {
    scanRangeSpecs_ = new TScanRangeSpec();

    List<KuduScanToken> scanTokens = createScanTokens(client, rpcTable);
    for (KuduScanToken token: scanTokens) {
      LocatedTablet tablet = token.getTablet();
      List<TScanRangeLocation> locations = new ArrayList<>();
      if (tablet.getReplicas().isEmpty()) {
        throw new ImpalaRuntimeException(String.format(
            "At least one tablet does not have any replicas. Tablet ID: %s",
            new String(tablet.getTabletId(), Charsets.UTF_8)));
      }

      for (LocatedTablet.Replica replica: tablet.getReplicas()) {
        TNetworkAddress address =
            new TNetworkAddress(replica.getRpcHost(), replica.getRpcPort());
        // Use the network address to look up the host in the global list
        Integer hostIndex = analyzer.getHostIndex().getIndex(address);
        locations.add(new TScanRangeLocation(hostIndex));
        hostIndexSet_.add(hostIndex);
      }

      TScanRange scanRange = new TScanRange();
      try {
        scanRange.setKudu_scan_token(token.serialize());
      } catch (IOException e) {
        throw new ImpalaRuntimeException("Unable to serialize Kudu scan token=" +
            token.toString(), e);
      }

      TScanRangeLocationList locs = new TScanRangeLocationList();
      locs.setScan_range(scanRange);
      locs.setLocations(locations);
      scanRangeSpecs_.addToConcrete_ranges(locs);
    }
  }

  /**
   * Returns KuduScanTokens for this scan given the projected columns and predicates that
   * will be pushed to Kudu. The projected Kudu columns are ordered by offset in an
   * Impala tuple to make the Impala and Kudu tuple layouts identical.
   */
  private List<KuduScanToken> createScanTokens(KuduClient client,
      org.apache.kudu.client.KuduTable rpcTable) {
    List<String> projectedCols = new ArrayList<>();
    for (SlotDescriptor desc: getTupleDesc().getSlotsOrderedByOffset()) {
      if (!isCountStarOptimizationDescriptor(desc)) {
        projectedCols.add(((KuduColumn) desc.getColumn()).getKuduName());
      }
    }
    KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(rpcTable);
    tokenBuilder.setProjectedColumnNames(projectedCols);
    for (KuduPredicate predicate: kuduPredicates_) tokenBuilder.addPredicate(predicate);
    return tokenBuilder.build();
  }

  @Override
  protected double computeSelectivity() {
    List<Expr> allConjuncts = Lists.newArrayList(
        Iterables.concat(conjuncts_, kuduConjuncts_));
    return computeCombinedSelectivity(allConjuncts);
  }

  @Override
  protected void computeStats(Analyzer analyzer) {
    super.computeStats(analyzer);
    // Update the number of nodes to reflect the hosts that have relevant data.
    numNodes_ = Math.max(1, hostIndexSet_.size());

    // Update the cardinality
    inputCardinality_ = cardinality_ = kuduTable_.getNumRows();
    cardinality_ = applyConjunctsSelectivity(cardinality_);
    cardinality_ = capCardinalityAtLimit(cardinality_);
    if (LOG.isTraceEnabled()) {
      LOG.trace("computeStats KuduScan: cardinality=" + Long.toString(cardinality_));
    }
  }

  @Override
  public void computeNodeResourceProfile(TQueryOptions queryOptions) {
    // The bulk of memory used by Kudu scan node is generally utilized by the
    // RowbatchQueue plus the row batches filled in by the scanner threads and
    // waiting to be queued into the RowbatchQueue. Due to a number of factors
    // like variable length string columns, mem pool usage pattern,
    // variability of the number of scanner threads being spawned and the
    // variability of the average RowbatchQueue size, it is increasingly
    // difficult to precisely estimate the memory usage. Therefore, we fall back
    // to a more simpler approach of using empirically derived estimates.
    int numOfScanRanges = scanRangeSpecs_.getConcrete_rangesSize();
    int perHostScanRanges = estimatePerHostScanRanges(numOfScanRanges);
    int maxScannerThreads = computeMaxNumberOfScannerThreads(queryOptions,
        perHostScanRanges);
    int num_cols = desc_.getSlots().size();
    long estimated_bytes_per_column_per_thread = BackendConfig.INSTANCE.getBackendCfg().
        kudu_scanner_thread_estimated_bytes_per_column;
    long max_estimated_bytes_per_thread = BackendConfig.INSTANCE.getBackendCfg().
        kudu_scanner_thread_max_estimated_bytes;
    long mem_estimate_per_thread = Math.min(num_cols *
        estimated_bytes_per_column_per_thread, max_estimated_bytes_per_thread);
    useMtScanNode_ = queryOptions.mt_dop > 0;
    nodeResourceProfile_ = new ResourceProfileBuilder()
        .setMemEstimateBytes(mem_estimate_per_thread * maxScannerThreads)
        .setThreadReservation(useMtScanNode_ ? 0 : 1).build();
  }

  @Override
  protected String getNodeExplainString(String prefix, String detailPrefix,
      TExplainLevel detailLevel) {
    StringBuilder result = new StringBuilder();

    String aliasStr = desc_.hasExplicitAlias() ? " " + desc_.getAlias() : "";
    result.append(String.format("%s%s:%s [%s%s]\n", prefix, id_.toString(), displayName_,
        kuduTable_.getFullName(), aliasStr));

    switch (detailLevel) {
      case MINIMAL: break;
      case STANDARD: // Fallthrough intended.
      case EXTENDED: // Fallthrough intended.
      case VERBOSE: {
        if (!conjuncts_.isEmpty()) {
          result.append(detailPrefix
              + "predicates: " + Expr.getExplainString(conjuncts_, detailLevel) + "\n");
        }
        if (!kuduConjuncts_.isEmpty()) {
          result.append(detailPrefix + "kudu predicates: "
              + Expr.getExplainString(kuduConjuncts_, detailLevel) + "\n");
        }
        if (!runtimeFilters_.isEmpty()) {
          result.append(detailPrefix + "runtime filters: ");
          result.append(getRuntimeFilterExplainString(false, detailLevel));
        }
      }
    }
    return result.toString();
  }

  @Override
  protected void toThrift(TPlanNode node) {
    node.node_type = TPlanNodeType.KUDU_SCAN_NODE;
    node.kudu_scan_node = new TKuduScanNode(desc_.getId().asInt());
    node.kudu_scan_node.setUse_mt_scan_node(useMtScanNode_);

    Preconditions.checkState((optimizedAggSmap_ == null) == (countStarSlot_ == null));
    if (countStarSlot_ != null) {
      node.kudu_scan_node.setCount_star_slot_offset(countStarSlot_.getByteOffset());
    }
  }

  /**
   * Extracts predicates from conjuncts_ that can be pushed down to Kudu. Currently only
   * binary predicates that have a constant expression on one side and a slot ref on the
   * other can be evaluated by Kudu. Only looks at comparisons of constants (i.e., the
   * bounds of the result can be evaluated with Expr::GetValue(NULL)). If a conjunct can
   * be converted into this form, the normalized expr is added to kuduConjuncts_, a
   * KuduPredicate is added to kuduPredicates_, and the original expr from conjuncts_ is
   * removed.
   */
  private void extractKuduConjuncts(Analyzer analyzer,
      KuduClient client, org.apache.kudu.client.KuduTable rpcTable) {
    ListIterator<Expr> it = conjuncts_.listIterator();
    while (it.hasNext()) {
      Expr predicate = it.next();
      if (tryConvertBinaryKuduPredicate(analyzer, rpcTable, predicate) ||
          tryConvertInListKuduPredicate(analyzer, rpcTable, predicate) ||
          tryConvertIsNullKuduPredicate(analyzer, rpcTable, predicate)) {
        it.remove();
      }
    }
  }

  /**
   * If 'expr' can be converted to a KuduPredicate, returns true and updates
   * kuduPredicates_ and kuduConjuncts_.
   */
  private boolean tryConvertBinaryKuduPredicate(Analyzer analyzer,
      org.apache.kudu.client.KuduTable table, Expr expr) {
    if (!(expr instanceof BinaryPredicate)) return false;
    BinaryPredicate predicate = (BinaryPredicate) expr;

    // TODO KUDU-931 look into handling implicit/explicit casts on the SlotRef.

    ComparisonOp op = getKuduOperator(predicate.getOp());
    if (op == null) return false;

    if (!(predicate.getChild(0) instanceof SlotRef)) return false;
    SlotRef ref = (SlotRef) predicate.getChild(0);

    if (!(predicate.getChild(1) instanceof LiteralExpr)) return false;
    LiteralExpr literal = (LiteralExpr) predicate.getChild(1);

    // Cannot push predicates with null literal values (KUDU-1595).
    if (Expr.IS_NULL_LITERAL.apply(literal)) return false;

    String colName = ((KuduColumn) ref.getDesc().getColumn()).getKuduName();
    ColumnSchema column = table.getSchema().getColumn(colName);
    KuduPredicate kuduPredicate = null;
    switch (literal.getType().getPrimitiveType()) {
      case BOOLEAN: {
        kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
            ((BoolLiteral)literal).getValue());
        break;
      }
      case TINYINT:
      case SMALLINT:
      case INT: {
        kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
            ((NumericLiteral)literal).getIntValue());
        break;
      }
      case BIGINT: {
        kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
            ((NumericLiteral)literal).getLongValue());
        break;
      }
      case FLOAT: {
        kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
            (float)((NumericLiteral)literal).getDoubleValue());
        break;
      }
      case DOUBLE: {
        kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
            ((NumericLiteral)literal).getDoubleValue());
        break;
      }
      case STRING:
      case VARCHAR:
      case CHAR: {
        kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
            ((StringLiteral)literal).getUnescapedValue());
        break;
      }
      case TIMESTAMP: {
        try {
          // TODO: Simplify when Impala supports a 64-bit TIMESTAMP type.
          kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
              KuduUtil.timestampToUnixTimeMicros(analyzer, literal));
        } catch (Exception e) {
          LOG.info("Exception converting Kudu timestamp predicate: " + expr.toSql(), e);
          return false;
        }
        break;
      }
      case DECIMAL: {
        kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
            ((NumericLiteral)literal).getValue());
        break;
      }
      default: break;
    }
    if (kuduPredicate == null) return false;

    kuduConjuncts_.add(predicate);
    kuduPredicates_.add(kuduPredicate);
    return true;
  }

  /**
   * If the InList 'expr' can be converted to a KuduPredicate, returns true and updates
   * kuduPredicates_ and kuduConjuncts_.
   */
  private boolean tryConvertInListKuduPredicate(Analyzer analyzer,
      org.apache.kudu.client.KuduTable table, Expr expr) {
    if (!(expr instanceof InPredicate)) return false;
    InPredicate predicate = (InPredicate) expr;

    // Only convert IN predicates, i.e. cannot convert NOT IN.
    if (predicate.isNotIn()) return false;

    // Do not convert if there is an implicit cast.
    if (!(predicate.getChild(0) instanceof SlotRef)) return false;
    SlotRef ref = (SlotRef) predicate.getChild(0);

    // KuduPredicate takes a list of values as Objects.
    List<Object> values = new ArrayList<>();
    for (int i = 1; i < predicate.getChildren().size(); ++i) {
      if (!Expr.IS_LITERAL.apply(predicate.getChild(i))) return false;
      LiteralExpr literal = (LiteralExpr) predicate.getChild(i);

      // Cannot push predicates with null literal values (KUDU-1595).
      if (Expr.IS_NULL_LITERAL.apply(literal)) return false;

      Object value = getKuduInListValue(analyzer, literal);
      if (value == null) return false;
      values.add(value);
    }

    String colName = ((KuduColumn) ref.getDesc().getColumn()).getKuduName();
    ColumnSchema column = table.getSchema().getColumn(colName);
    kuduPredicates_.add(KuduPredicate.newInListPredicate(column, values));
    kuduConjuncts_.add(predicate);
    return true;
  }

  /**
   * If IS NULL/IS NOT NULL 'expr' can be converted to a KuduPredicate,
   * returns true and updates kuduPredicates_ and kuduConjuncts_.
   */
  private boolean tryConvertIsNullKuduPredicate(Analyzer analyzer,
      org.apache.kudu.client.KuduTable table, Expr expr) {
    if (!(expr instanceof IsNullPredicate)) return false;
    IsNullPredicate predicate = (IsNullPredicate) expr;

    // Do not convert if expression is more than a SlotRef
    // This is true even for casts, as certain casts can take a non-NULL
    // value and produce a NULL. For example, CAST('test' as tinyint)
    // is NULL.
    if (!(predicate.getChild(0) instanceof SlotRef)) return false;
    SlotRef ref = (SlotRef) predicate.getChild(0);

    String colName = ((KuduColumn) ref.getDesc().getColumn()).getKuduName();
    ColumnSchema column = table.getSchema().getColumn(colName);
    KuduPredicate kuduPredicate = null;
    if (predicate.isNotNull()) {
      kuduPredicate = KuduPredicate.newIsNotNullPredicate(column);
    } else {
      kuduPredicate = KuduPredicate.newIsNullPredicate(column);
    }
    kuduConjuncts_.add(predicate);
    kuduPredicates_.add(kuduPredicate);
    return true;
  }

  /**
   * Return the value of the InList child expression 'e' as an Object that can be
   * added to a KuduPredicate. If the Expr is not supported by Kudu or the type doesn't
   * match the expected PrimitiveType 'type', null is returned.
   */
  private static Object getKuduInListValue(Analyzer analyzer, LiteralExpr e) {
    switch (e.getType().getPrimitiveType()) {
      case BOOLEAN: return ((BoolLiteral) e).getValue();
      case TINYINT: return (byte) ((NumericLiteral) e).getLongValue();
      case SMALLINT: return (short) ((NumericLiteral) e).getLongValue();
      case INT: return (int) ((NumericLiteral) e).getLongValue();
      case BIGINT: return ((NumericLiteral) e).getLongValue();
      case FLOAT: return (float) ((NumericLiteral) e).getDoubleValue();
      case DOUBLE: return ((NumericLiteral) e).getDoubleValue();
      case STRING: return ((StringLiteral) e).getUnescapedValue();
      case TIMESTAMP: {
        try {
          // TODO: Simplify when Impala supports a 64-bit TIMESTAMP type.
          return KuduUtil.timestampToUnixTimeMicros(analyzer, e);
        } catch (Exception ex) {
          LOG.info("Exception converting Kudu timestamp expr: " + e.toSql(), ex);
        }
        break;
      }
      case DECIMAL: return ((NumericLiteral) e).getValue();
      default:
        Preconditions.checkState(false,
            "Unsupported Kudu type considered for predicate: %s", e.getType().toSql());
    }
    return null;
  }

  /**
   * Returns a Kudu comparison operator for the BinaryPredicate operator, or null if
   * the operation is not supported by Kudu.
   */
  private static KuduPredicate.ComparisonOp getKuduOperator(BinaryPredicate.Operator op) {
    switch (op) {
      case GT: return ComparisonOp.GREATER;
      case LT: return ComparisonOp.LESS;
      case GE: return ComparisonOp.GREATER_EQUAL;
      case LE: return ComparisonOp.LESS_EQUAL;
      case EQ: return ComparisonOp.EQUAL;
      default: return null;
    }
  }

  @Override
  public boolean hasStorageLayerConjuncts() { return !kuduConjuncts_.isEmpty(); }
}
