/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.store.parquet;

import org.apache.calcite.rel.core.Filter;
import org.apache.drill.exec.physical.base.AbstractGroupScanWithMetadata;
import org.apache.drill.exec.expr.FilterPredicate;
import com.google.common.base.Stopwatch;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.logical.DrillOptiq;
import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.FilterPrel;
import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.planner.physical.ProjectPrel;
import org.apache.drill.exec.planner.physical.ScanPrel;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

public abstract class FilePushDownFilter extends StoragePluginOptimizerRule {

  private static final Logger logger = LoggerFactory.getLogger(FilePushDownFilter.class);

  private static final Collection<String> BANNED_OPERATORS;

  static {
    BANNED_OPERATORS = new ArrayList<>(1);
    BANNED_OPERATORS.add("flatten");
  }

  public static RelOptRule getFilterOnProject(OptimizerRulesContext optimizerRulesContext) {
    return new FilePushDownFilter(
        RelOptHelper.some(FilterPrel.class, RelOptHelper.some(ProjectPrel.class, RelOptHelper.any(ScanPrel.class))),
        "FilePushDownFilter:Filter_On_Project", optimizerRulesContext) {

      @Override
      public boolean matches(RelOptRuleCall call) {
        final ScanPrel scan = call.rel(2);
        if (scan.getGroupScan().supportsFilterPushDown()) {
          return super.matches(call);
        }
        return false;
      }

      @Override
      public void onMatch(RelOptRuleCall call) {
        final FilterPrel filterRel = call.rel(0);
        final ProjectPrel projectRel = call.rel(1);
        final ScanPrel scanRel = call.rel(2);
        doOnMatch(call, filterRel, projectRel, scanRel);
      }

    };
  }

  public static StoragePluginOptimizerRule getFilterOnScan(OptimizerRulesContext optimizerContext) {
    return new FilePushDownFilter(
        RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)),
        "FilePushDownFilter:Filter_On_Scan", optimizerContext) {

      @Override
      public boolean matches(RelOptRuleCall call) {
        final ScanPrel scan = call.rel(1);
        if (scan.getGroupScan().supportsFilterPushDown()) {
          return super.matches(call);
        }
        return false;
      }

      @Override
      public void onMatch(RelOptRuleCall call) {
        final FilterPrel filterRel = call.rel(0);
        final ScanPrel scanRel = call.rel(1);
        doOnMatch(call, filterRel, null, scanRel);
      }
    };
  }

  // private final boolean useNewReader;
  protected final OptimizerRulesContext optimizerContext;

  private FilePushDownFilter(RelOptRuleOperand operand, String id, OptimizerRulesContext optimizerContext) {
    super(operand, id);
    this.optimizerContext = optimizerContext;
  }

  protected void doOnMatch(RelOptRuleCall call, FilterPrel filter, ProjectPrel project, ScanPrel scan) {
    AbstractGroupScanWithMetadata<?> groupScan = (AbstractGroupScanWithMetadata<?>) scan.getGroupScan();
    if (groupScan.getFilter() != null && !groupScan.getFilter().equals(ValueExpressions.BooleanExpression.TRUE)) {
      return;
    }

    RexNode condition;
    if (project == null) {
      condition = filter.getCondition();
    } else {
      // get the filter as if it were below the projection.
      condition = RelOptUtil.pushPastProject(filter.getCondition(), project);
    }

    if (condition == null || condition.isAlwaysTrue()) {
      return;
    }

    // get a conjunctions of the filter condition. For each conjunction, if it refers to ITEM or FLATTEN expression
    // then we could not pushed down. Otherwise, it's qualified to be pushed down.
    // Limits the number of nodes that can be created out of the conversion to avoid
    // exponential growth of nodes count and further OOM
    final List<RexNode> predList = RelOptUtil.conjunctions(RexUtil.toCnf(filter.getCluster().getRexBuilder(), 100, condition));

    final List<RexNode> qualifiedPredList = new ArrayList<>();

    // list of predicates which cannot be converted to filter predicate
    List<RexNode> nonConvertedPredList = new ArrayList<>();

    for (RexNode pred : predList) {
      if (DrillRelOptUtil.findOperators(pred, Collections.emptyList(), BANNED_OPERATORS) == null) {
        LogicalExpression drillPredicate = DrillOptiq.toDrill(
            new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, pred);

        // checks whether predicate may be used for filter pushdown
        FilterPredicate<?> filterPredicate =
            groupScan.getFilterPredicate(drillPredicate,
                optimizerContext,
                optimizerContext.getFunctionRegistry(), optimizerContext.getPlannerSettings().getOptions(), false);
        // collects predicates that contain unsupported for filter pushdown expressions
        // to build filter with them
        if (filterPredicate == null) {
          nonConvertedPredList.add(pred);
        }
        qualifiedPredList.add(pred);
      } else {
        nonConvertedPredList.add(pred);
      }
    }

    final RexNode qualifiedPred = RexUtil.composeConjunction(filter.getCluster().getRexBuilder(), qualifiedPredList, true);

    if (qualifiedPred == null) {
      return;
    }

    LogicalExpression conditionExp = DrillOptiq.toDrill(
        new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, qualifiedPred);

    // Default - pass the original filter expr to (potentially) be used at run-time
    groupScan.setFilterForRuntime(conditionExp, optimizerContext); // later may remove or set to another filter (see below)

    Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
    AbstractGroupScanWithMetadata<?> newGroupScan = groupScan.applyFilter(conditionExp, optimizerContext,
        optimizerContext.getFunctionRegistry(), optimizerContext.getPlannerSettings().getOptions());
    if (timer != null) {
      logger.debug("Took {} ms to apply filter. ", timer.elapsed(TimeUnit.MILLISECONDS));
      timer.stop();
    }

    // For the case when newGroupScan wasn't created, the old one may
    // fully match the filter for the case when row group pruning did not happen.
    if (newGroupScan == null) {
      if (groupScan.isMatchAllMetadata()) {
        RelNode child = project == null ? scan : project;
        // If current row group fully matches filter,
        // but row group pruning did not happen, remove the filter.
        if (nonConvertedPredList.isEmpty()) {
          groupScan.setFilterForRuntime(null, optimizerContext); // disable the original filter expr (i.e. don't use it at run-time)
          call.transformTo(child);
        } else if (nonConvertedPredList.size() == predList.size()) {
          // None of the predicates participated in filter pushdown.
          return;
        } else {
          // If some of the predicates weren't used in the filter, creates new filter with them
          // on top of current scan. Excludes the case when all predicates weren't used in the filter.
          Filter theNewFilter  = filter.copy(filter.getTraitSet(), child,
            RexUtil.composeConjunction(
              filter.getCluster().getRexBuilder(),
              nonConvertedPredList,
              true));

          LogicalExpression filterPredicate = DrillOptiq.toDrill(
            new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, theNewFilter.getCondition());

          groupScan.setFilterForRuntime(filterPredicate, optimizerContext); // pass the new filter expr to (potentialy) be used at run-time

          call.transformTo(theNewFilter); // Replace the child with the new filter on top of the child/scan
        }
      }
      return;
    }

    RelNode newNode = new ScanPrel(scan.getCluster(), scan.getTraitSet(), newGroupScan, scan.getRowType(), scan.getTable());

    if (project != null) {
      newNode = project.copy(project.getTraitSet(), Collections.singletonList(newNode));
    }

    if (newGroupScan.isMatchAllMetadata()) {
      // creates filter from the expressions which can't be pushed to the scan
      if (!nonConvertedPredList.isEmpty()) {
        Filter theFilterRel  = filter.copy(filter.getTraitSet(), newNode,
          RexUtil.composeConjunction(
            filter.getCluster().getRexBuilder(),
            nonConvertedPredList,
            true));

        LogicalExpression filterPredicate = DrillOptiq.toDrill(
          new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, theFilterRel.getCondition());

        newGroupScan.setFilterForRuntime(filterPredicate, optimizerContext); // pass the new filter expr to (potentialy) be used at run-time

        newNode = theFilterRel; // replace the new node with the new filter on top of that new node
      }
      call.transformTo(newNode);
      return;
    }

    final RelNode newFilter = filter.copy(filter.getTraitSet(), Collections.singletonList(newNode));
    call.transformTo(newFilter);
  }
}
