/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pinot.core.plan.maker;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.OrderByExpressionContext;
import org.apache.pinot.common.request.context.predicate.Predicate;
import org.apache.pinot.core.plan.AcquireReleaseColumnsSegmentPlanNode;
import org.apache.pinot.core.plan.AggregationPlanNode;
import org.apache.pinot.core.plan.CombinePlanNode;
import org.apache.pinot.core.plan.DistinctPlanNode;
import org.apache.pinot.core.plan.GlobalPlanImplV0;
import org.apache.pinot.core.plan.GroupByPlanNode;
import org.apache.pinot.core.plan.InstanceResponsePlanNode;
import org.apache.pinot.core.plan.Plan;
import org.apache.pinot.core.plan.PlanNode;
import org.apache.pinot.core.plan.SelectionPlanNode;
import org.apache.pinot.core.plan.StreamingInstanceResponsePlanNode;
import org.apache.pinot.core.plan.StreamingSelectionPlanNode;
import org.apache.pinot.core.query.config.QueryExecutorConfig;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
import org.apache.pinot.core.util.GroupByUtils;
import org.apache.pinot.core.util.QueryOptionsUtils;
import org.apache.pinot.segment.spi.FetchContext;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * The <code>InstancePlanMakerImplV2</code> class is the default implementation of {@link PlanMaker}.
 */
public class InstancePlanMakerImplV2 implements PlanMaker {
  // Instance config key for maximum number of threads used to execute the query
  // Set as pinot.server.query.executor.max.execution.threads
  public static final String MAX_EXECUTION_THREADS_KEY = "max.execution.threads";
  public static final int DEFAULT_MAX_EXECUTION_THREADS = -1;

  public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY = "max.init.group.holder.capacity";
  public static final int DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY = 10_000;
  public static final String NUM_GROUPS_LIMIT_KEY = "num.groups.limit";
  public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000;

  // Instance config key for minimum segment-level group trim size
  // Set as pinot.server.query.executor.min.segment.group.trim.size
  public static final String MIN_SEGMENT_GROUP_TRIM_SIZE_KEY = "min.segment.group.trim.size";
  public static final int DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE = -1;
  // Instance config key for minimum server-level group trim size
  // Caution: Setting it to non-positive value (disable trim) or large value can give more accurate result, but can
  //          potentially cause memory issue
  // Set as pinot.server.query.executor.min.server.group.trim.size
  public static final String MIN_SERVER_GROUP_TRIM_SIZE_KEY = "min.server.group.trim.size";
  public static final int DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE = GroupByUtils.DEFAULT_MIN_NUM_GROUPS;
  // set as pinot.server.query.executor.groupby.trim.threshold
  public static final String GROUPBY_TRIM_THRESHOLD_KEY = "groupby.trim.threshold";
  public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000;

  private static final Logger LOGGER = LoggerFactory.getLogger(InstancePlanMakerImplV2.class);

  private final int _maxExecutionThreads;
  private final int _maxInitialResultHolderCapacity;
  // Limit on number of groups stored for each segment, beyond which no new group will be created
  private final int _numGroupsLimit;
  // Used for SQL GROUP BY (server combine)
  private final int _minSegmentGroupTrimSize;
  private final int _minServerGroupTrimSize;
  private final int _groupByTrimThreshold;

  @VisibleForTesting
  public InstancePlanMakerImplV2() {
    _maxExecutionThreads = DEFAULT_MAX_EXECUTION_THREADS;
    _maxInitialResultHolderCapacity = DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY;
    _numGroupsLimit = DEFAULT_NUM_GROUPS_LIMIT;
    _minSegmentGroupTrimSize = DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE;
    _minServerGroupTrimSize = DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE;
    _groupByTrimThreshold = DEFAULT_GROUPBY_TRIM_THRESHOLD;
  }

  @VisibleForTesting
  public InstancePlanMakerImplV2(int maxInitialResultHolderCapacity, int numGroupsLimit, int minSegmentGroupTrimSize,
      int minServerGroupTrimSize, int groupByTrimThreshold) {
    _maxExecutionThreads = DEFAULT_MAX_EXECUTION_THREADS;
    _maxInitialResultHolderCapacity = maxInitialResultHolderCapacity;
    _numGroupsLimit = numGroupsLimit;
    _minSegmentGroupTrimSize = minSegmentGroupTrimSize;
    _minServerGroupTrimSize = minServerGroupTrimSize;
    _groupByTrimThreshold = groupByTrimThreshold;
  }

  /**
   * Constructor for usage when client requires to pass {@link QueryExecutorConfig} to this class.
   * <ul>
   *   <li>Set limit on the initial result holder capacity</li>
   *   <li>Set limit on number of groups returned from each segment and combined result</li>
   * </ul>
   *
   * @param queryExecutorConfig Query executor configuration
   */
  public InstancePlanMakerImplV2(QueryExecutorConfig queryExecutorConfig) {
    PinotConfiguration config = queryExecutorConfig.getConfig();
    _maxExecutionThreads = config.getProperty(MAX_EXECUTION_THREADS_KEY, DEFAULT_MAX_EXECUTION_THREADS);
    _maxInitialResultHolderCapacity =
        config.getProperty(MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY, DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
    _numGroupsLimit = config.getProperty(NUM_GROUPS_LIMIT_KEY, DEFAULT_NUM_GROUPS_LIMIT);
    Preconditions.checkState(_maxInitialResultHolderCapacity <= _numGroupsLimit,
        "Invalid configuration: maxInitialResultHolderCapacity: %d must be smaller or equal to numGroupsLimit: %d",
        _maxInitialResultHolderCapacity, _numGroupsLimit);
    _minSegmentGroupTrimSize = config.getProperty(MIN_SEGMENT_GROUP_TRIM_SIZE_KEY, DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE);
    _minServerGroupTrimSize = config.getProperty(MIN_SERVER_GROUP_TRIM_SIZE_KEY, DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE);
    _groupByTrimThreshold = config.getProperty(GROUPBY_TRIM_THRESHOLD_KEY, DEFAULT_GROUPBY_TRIM_THRESHOLD);
    Preconditions.checkState(_groupByTrimThreshold > 0,
        "Invalid configurable: groupByTrimThreshold: %d must be positive", _groupByTrimThreshold);
    LOGGER.info("Initializing plan maker with maxInitialResultHolderCapacity: {}, numGroupsLimit: {}, "
            + "minSegmentGroupTrimSize: {}, minServerGroupTrimSize: {}", _maxInitialResultHolderCapacity,
        _numGroupsLimit,
        _minSegmentGroupTrimSize, _minServerGroupTrimSize);
  }

  @Override
  public Plan makeInstancePlan(List<IndexSegment> indexSegments, QueryContext queryContext,
      ExecutorService executorService, ServerMetrics serverMetrics) {
    applyQueryOptions(queryContext);

    int numSegments = indexSegments.size();
    List<PlanNode> planNodes = new ArrayList<>(numSegments);
    List<FetchContext> fetchContexts;

    if (queryContext.isEnablePrefetch()) {
      fetchContexts = new ArrayList<>(numSegments);
      List<ExpressionContext> selectExpressions = queryContext.getSelectExpressions();
      for (IndexSegment indexSegment : indexSegments) {
        Set<String> columns;
        if (selectExpressions.size() == 1 && "*".equals(selectExpressions.get(0).getIdentifier())) {
          columns = indexSegment.getPhysicalColumnNames();
        } else {
          columns = queryContext.getColumns();
        }
        FetchContext fetchContext = new FetchContext(UUID.randomUUID(), indexSegment.getSegmentName(), columns);
        fetchContexts.add(fetchContext);
        planNodes.add(
            new AcquireReleaseColumnsSegmentPlanNode(makeSegmentPlanNode(indexSegment, queryContext), indexSegment,
                fetchContext));
      }
    } else {
      fetchContexts = Collections.emptyList();
      for (IndexSegment indexSegment : indexSegments) {
        planNodes.add(makeSegmentPlanNode(indexSegment, queryContext));
      }
    }

    CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, queryContext, executorService, null);
    return new GlobalPlanImplV0(
        new InstanceResponsePlanNode(combinePlanNode, indexSegments, fetchContexts, queryContext));
  }

  private void applyQueryOptions(QueryContext queryContext) {
    Map<String, String> queryOptions = queryContext.getQueryOptions();

    // Set skipUpsert
    queryContext.setSkipUpsert(QueryOptionsUtils.isSkipUpsert(queryOptions));

    // Set skipStarTree
    queryContext.setSkipStarTree(QueryOptionsUtils.isSkipStarTree(queryOptions));

    // Set skipScanFilterReorder
    queryContext.setSkipScanFilterReorder(QueryOptionsUtils.isSkipScanFilterReorder(queryOptions));

    // Set maxExecutionThreads
    int maxExecutionThreads;
    Integer maxExecutionThreadsFromQuery = QueryOptionsUtils.getMaxExecutionThreads(queryOptions);
    if (maxExecutionThreadsFromQuery != null && maxExecutionThreadsFromQuery > 0) {
      // Do not allow query to override the execution threads over the instance-level limit
      if (_maxExecutionThreads > 0) {
        maxExecutionThreads = Math.min(_maxExecutionThreads, maxExecutionThreadsFromQuery);
      } else {
        maxExecutionThreads = maxExecutionThreadsFromQuery;
      }
    } else {
      maxExecutionThreads = _maxExecutionThreads;
    }
    queryContext.setMaxExecutionThreads(maxExecutionThreads);

    // Set group-by query options
    if (QueryContextUtils.isAggregationQuery(queryContext) && queryContext.getGroupByExpressions() != null) {

      // Set maxInitialResultHolderCapacity
      queryContext.setMaxInitialResultHolderCapacity(_maxInitialResultHolderCapacity);

      // Set numGroupsLimit
      queryContext.setNumGroupsLimit(_numGroupsLimit);

      // Set minSegmentGroupTrimSize
      Integer minSegmentGroupTrimSizeFromQuery = QueryOptionsUtils.getMinSegmentGroupTrimSize(queryOptions);
      int minSegmentGroupTrimSize =
          minSegmentGroupTrimSizeFromQuery != null ? minSegmentGroupTrimSizeFromQuery : _minSegmentGroupTrimSize;
      queryContext.setMinSegmentGroupTrimSize(minSegmentGroupTrimSize);

      // Set minServerGroupTrimSize
      Integer minServerGroupTrimSizeFromQuery = QueryOptionsUtils.getMinServerGroupTrimSize(queryOptions);
      int minServerGroupTrimSize =
          minServerGroupTrimSizeFromQuery != null ? minServerGroupTrimSizeFromQuery : _minServerGroupTrimSize;
      queryContext.setMinServerGroupTrimSize(minServerGroupTrimSize);

      // Set groupTrimThreshold
      queryContext.setGroupTrimThreshold(_groupByTrimThreshold);
    }
  }

  @Override
  public PlanNode makeSegmentPlanNode(IndexSegment indexSegment, QueryContext queryContext) {
    rewriteQueryContextWithHints(queryContext, indexSegment);
    if (QueryContextUtils.isAggregationQuery(queryContext)) {
      List<ExpressionContext> groupByExpressions = queryContext.getGroupByExpressions();
      if (groupByExpressions != null) {
        // Group-by query
        return new GroupByPlanNode(indexSegment, queryContext);
      } else {
        // Aggregation query
        return new AggregationPlanNode(indexSegment, queryContext);
      }
    } else if (QueryContextUtils.isSelectionQuery(queryContext)) {
      return new SelectionPlanNode(indexSegment, queryContext);
    } else {
      assert QueryContextUtils.isDistinctQuery(queryContext);
      return new DistinctPlanNode(indexSegment, queryContext);
    }
  }

  @Override
  public Plan makeStreamingInstancePlan(List<IndexSegment> indexSegments, QueryContext queryContext,
      ExecutorService executorService, StreamObserver<Server.ServerResponse> streamObserver,
      ServerMetrics serverMetrics) {
    applyQueryOptions(queryContext);

    List<PlanNode> planNodes = new ArrayList<>(indexSegments.size());
    for (IndexSegment indexSegment : indexSegments) {
      planNodes.add(makeStreamingSegmentPlanNode(indexSegment, queryContext));
    }
    CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, queryContext, executorService, streamObserver);
    if (QueryContextUtils.isSelectionOnlyQuery(queryContext)) {
      // selection-only is streamed in StreamingSelectionPlanNode --> here only metadata block is returned.
      return new GlobalPlanImplV0(
          new InstanceResponsePlanNode(combinePlanNode, indexSegments, Collections.emptyList(), queryContext));
    } else {
      // non-selection-only requires a StreamingInstanceResponsePlanNode to stream data block back and metadata block
      // as final return.
      return new GlobalPlanImplV0(
          new StreamingInstanceResponsePlanNode(combinePlanNode, indexSegments, Collections.emptyList(), queryContext,
              streamObserver));
    }
  }

  @Override
  public PlanNode makeStreamingSegmentPlanNode(IndexSegment indexSegment, QueryContext queryContext) {
    if (!QueryContextUtils.isSelectionOnlyQuery(queryContext)) {
      // non-selection-only query goes through normal SegmentPlan.
      // it will be stream back via StreamingInstanceResponsePlanNode
      return makeSegmentPlanNode(indexSegment, queryContext);
    } else {
      // Selection-only query can be directly stream back
      return new StreamingSelectionPlanNode(indexSegment, queryContext);
    }
  }

  /**
   * In-place rewrite QueryContext based on the information from local IndexSegment.
   *
   * @param queryContext
   * @param indexSegment
   */
  @VisibleForTesting
  public static void rewriteQueryContextWithHints(QueryContext queryContext, IndexSegment indexSegment) {
    Map<ExpressionContext, ExpressionContext> expressionOverrideHints = queryContext.getExpressionOverrideHints();
    if (MapUtils.isEmpty(expressionOverrideHints)) {
      return;
    }

    List<ExpressionContext> selectExpressions = queryContext.getSelectExpressions();
    selectExpressions.replaceAll(
        expression -> overrideWithExpressionHints(expression, indexSegment, expressionOverrideHints));

    List<ExpressionContext> groupByExpressions = queryContext.getGroupByExpressions();
    if (CollectionUtils.isNotEmpty(groupByExpressions)) {
      groupByExpressions.replaceAll(
          expression -> overrideWithExpressionHints(expression, indexSegment, expressionOverrideHints));
    }

    List<OrderByExpressionContext> orderByExpressions = queryContext.getOrderByExpressions();
    if (CollectionUtils.isNotEmpty(orderByExpressions)) {
      orderByExpressions.replaceAll(expression -> new OrderByExpressionContext(
          overrideWithExpressionHints(expression.getExpression(), indexSegment, expressionOverrideHints),
          expression.isAsc()));
    }

    // In-place override
    FilterContext filter = queryContext.getFilter();
    if (filter != null) {
      overrideWithExpressionHints(filter, indexSegment, expressionOverrideHints);
    }

    // In-place override
    FilterContext havingFilter = queryContext.getHavingFilter();
    if (havingFilter != null) {
      overrideWithExpressionHints(havingFilter, indexSegment, expressionOverrideHints);
    }
  }

  @VisibleForTesting
  public static void overrideWithExpressionHints(FilterContext filter, IndexSegment indexSegment,
      Map<ExpressionContext, ExpressionContext> expressionOverrideHints) {
    if (filter.getChildren() != null) {
      // AND, OR, NOT
      for (FilterContext child : filter.getChildren()) {
        overrideWithExpressionHints(child, indexSegment, expressionOverrideHints);
      }
    } else {
      // PREDICATE
      Predicate predicate = filter.getPredicate();
      predicate.setLhs(overrideWithExpressionHints(predicate.getLhs(), indexSegment, expressionOverrideHints));
    }
  }

  @VisibleForTesting
  public static ExpressionContext overrideWithExpressionHints(ExpressionContext expression, IndexSegment indexSegment,
      Map<ExpressionContext, ExpressionContext> expressionOverrideHints) {
    if (expression.getType() != ExpressionContext.Type.FUNCTION) {
      return expression;
    }
    ExpressionContext overrideExpression = expressionOverrideHints.get(expression);
    if (overrideExpression != null && overrideExpression.getIdentifier() != null && indexSegment.getColumnNames()
        .contains(overrideExpression.getIdentifier())) {
      return overrideExpression;
    }
    expression.getFunction().getArguments()
        .replaceAll(argument -> overrideWithExpressionHints(argument, indexSegment, expressionOverrideHints));
    return expression;
  }
}
