/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.tajo.plan.rewrite.rules;

import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.SessionVars;
import org.apache.tajo.algebra.JoinType;
import org.apache.tajo.catalog.*;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.exception.TajoException;
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlan.QueryBlock;
import org.apache.tajo.plan.LogicalPlanner;
import org.apache.tajo.plan.Target;
import org.apache.tajo.plan.expr.*;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
import org.apache.tajo.plan.rewrite.rules.FilterPushDownRule.FilterPushDownContext;
import org.apache.tajo.plan.rewrite.rules.IndexScanInfo.SimplePredicate;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
import org.apache.tajo.util.TUtil;

import java.util.*;

/**
 * This rule tries to push down all filter conditions into logical nodes as lower as possible.
 * It is likely to significantly reduces the intermediate data.
 */
public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownContext, LogicalNode>
    implements LogicalPlanRewriteRule {
  private final static Log LOG = LogFactory.getLog(FilterPushDownRule.class);
  private static final String NAME = "FilterPushDown";

  private CatalogService catalog;

  static class FilterPushDownContext {
    Set<EvalNode> pushingDownFilters = new HashSet<>();
    LogicalPlanRewriteRuleContext rewriteRuleContext;

    public void clear() {
      pushingDownFilters.clear();
    }
    public void setFiltersTobePushed(Collection<EvalNode> workingEvals) {
      this.pushingDownFilters.clear();
      this.pushingDownFilters.addAll(workingEvals);
    }
    public void addFiltersTobePushed(Collection<EvalNode> workingEvals) {
      this.pushingDownFilters.addAll(workingEvals);
    }

    public void setToOrigin(Map<EvalNode, EvalNode> evalMap) {
      //evalMap: copy -> origin
      List<EvalNode> origins = TUtil.newList();
      for (EvalNode eval : pushingDownFilters) {
        EvalNode origin = evalMap.get(eval);
        if (origin != null) {
          origins.add(origin);
        }
      }
      setFiltersTobePushed(origins);
    }
  }

  @Override
  public String getName() {
    return NAME;
  }

  @Override
  public boolean isEligible(LogicalPlanRewriteRuleContext context) {
    for (LogicalPlan.QueryBlock block : context.getPlan().getQueryBlocks()) {
      if (block.hasNode(NodeType.SELECTION) || block.hasNode(NodeType.JOIN)) {
        return true;
      }
    }
    return false;
  }

  @Override
  public LogicalPlan rewrite(LogicalPlanRewriteRuleContext rewriteRuleContext) throws TajoException {
    /*
    FilterPushDown rule: processing when visits each node
      - If a target which is corresponding on a filter EvalNode's column is not FieldEval, do not PushDown.
      - Replace filter EvalNode's column with child node's output column.
        If there is no child node's output column, do not PushDown.
      - When visit ScanNode, add filter eval to ScanNode's qual
      - When visit GroupByNode, Find aggregation column in a filter EvalNode and
        . If a parent is HavingNode, add filter eval to parent HavingNode.
        . It not, create new HavingNode and set parent's child.
     */
    FilterPushDownContext context = new FilterPushDownContext();
    context.rewriteRuleContext = rewriteRuleContext;
    LogicalPlan plan = rewriteRuleContext.getPlan();
    catalog = rewriteRuleContext.getCatalog();
    for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
      context.clear();
      this.visit(context, plan, block, block.getRoot(), new Stack<LogicalNode>());
    }

    if (LOG.isDebugEnabled()) {
      LOG.debug("=============================================");
      LOG.debug("FilterPushDown Optimized Query: \n" + plan.toString());
      LOG.debug("=============================================");
    }
    return plan;
  }

  @Override
  public LogicalNode visitFilter(FilterPushDownContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                 SelectionNode selNode, Stack<LogicalNode> stack) throws TajoException {
    context.pushingDownFilters.addAll(new HashSet<>(Arrays.asList(AlgebraicUtil.toConjunctiveNormalFormArray(selNode.getQual()))));

    stack.push(selNode);
    visit(context, plan, block, selNode.getChild(), stack);
    stack.pop();

    if(context.pushingDownFilters.size() == 0) {
      // remove the selection operator if there is no search condition after selection push.
      LogicalNode node = stack.peek();
      if (node instanceof UnaryNode) {
        UnaryNode unary = (UnaryNode) node;
        unary.setChild(selNode.getChild());
      } else {
        throw new TajoInternalError("The node must be an unary node");
      }
    } else { // if there remain search conditions

      // check if it can be evaluated here
      Set<EvalNode> matched = new HashSet<>();
      for (EvalNode eachEval : context.pushingDownFilters) {
        if (LogicalPlanner.checkIfBeEvaluatedAtThis(eachEval, selNode)) {
          matched.add(eachEval);
        }
      }

      // if there are search conditions which can be evaluated here,
      // push down them and remove them from context.pushingDownFilters.
      if (matched.size() > 0) {
        selNode.setQual(AlgebraicUtil.createSingletonExprFromCNF(matched.toArray(new EvalNode[matched.size()])));
        context.pushingDownFilters.removeAll(matched);
      }
    }

    return selNode;
  }

  @Override
  public LogicalNode visitJoin(FilterPushDownContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                               JoinNode joinNode, Stack<LogicalNode> stack) throws TajoException {
    Set<EvalNode> onPredicates = new HashSet<>();
    if (joinNode.hasJoinQual()) {
      onPredicates.addAll(new HashSet<>(Arrays.asList(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual()))));
    }
    // clear join qual
    joinNode.clearJoinQual();

    // we assume all the quals in pushingDownFilters as where predicates
    Set<EvalNode> nonPushableQuals = extractNonPushableJoinQuals(plan, block, joinNode, onPredicates,
        context.pushingDownFilters);
    // add every predicate and remove non-pushable ones
    context.pushingDownFilters.addAll(onPredicates);
    context.pushingDownFilters.removeAll(nonPushableQuals);

    LogicalNode left = joinNode.getLeftChild();
    LogicalNode right = joinNode.getRightChild();

    List<EvalNode> notMatched = TUtil.newList();
    // Join's input schema = right child output columns + left child output columns
    Map<EvalNode, EvalNode> transformedMap = findCanPushdownAndTransform(context, block, joinNode, left, notMatched,
        null, 0);
    context.setFiltersTobePushed(transformedMap.keySet());
    visit(context, plan, block, left, stack);

    context.setToOrigin(transformedMap);
    context.addFiltersTobePushed(notMatched);

    notMatched.clear();
    transformedMap = findCanPushdownAndTransform(context, block, joinNode, right, notMatched, null,
        left.getOutSchema().size());
    context.setFiltersTobePushed(transformedMap.keySet());

    visit(context, plan, block, right, stack);

    context.setToOrigin(transformedMap);
    context.addFiltersTobePushed(notMatched);

    notMatched.clear();
    context.pushingDownFilters.addAll(nonPushableQuals);
    List<EvalNode> matched = TUtil.newList();

    // If the query involves a subquery, the stack can be empty.
    // In this case, this join is the top most one within a query block.
    boolean isTopMostJoin = stack.isEmpty() ? true : stack.peek().getType() != NodeType.JOIN;

    for (EvalNode evalNode : context.pushingDownFilters) {
      // TODO: currently, non-equi theta join is not supported yet.
      if (LogicalPlanner.isEvaluatableJoinQual(block, evalNode, joinNode, onPredicates.contains(evalNode),
          isTopMostJoin)) {
        matched.add(evalNode);
      }
    }

    EvalNode qual = null;
    if (matched.size() > 1) {
      // merged into one eval tree
      qual = AlgebraicUtil.createSingletonExprFromCNF(
          matched.toArray(new EvalNode[matched.size()]));
    } else if (matched.size() == 1) {
      // if the number of matched expr is one
      qual = matched.get(0);
    }

    if (qual != null) {
      joinNode.setJoinQual(qual);

      if (joinNode.getJoinType() == JoinType.CROSS) {
        joinNode.setJoinType(JoinType.INNER);
      }
    }

    context.pushingDownFilters.removeAll(matched);

    // The selection node for non-equi theta join conditions should be created here
    // to process those conditions as early as possible.
    // This should be removed after TAJO-742.
    if (context.pushingDownFilters.size() > 0) {
      List<EvalNode> nonEquiThetaJoinQuals = extractNonEquiThetaJoinQuals(context.pushingDownFilters, block, joinNode);
      if (nonEquiThetaJoinQuals.size() > 0) {
        SelectionNode selectionNode = createSelectionParentForNonEquiThetaJoinQuals(plan, block, stack, joinNode,
            nonEquiThetaJoinQuals);

        context.pushingDownFilters.removeAll(nonEquiThetaJoinQuals);
        return selectionNode;
      }
    }

    return joinNode;
  }

  private SelectionNode createSelectionParentForNonEquiThetaJoinQuals(LogicalPlan plan,
                                                                      QueryBlock block,
                                                                      Stack<LogicalNode> stack,
                                                                      JoinNode joinNode,
                                                                      List<EvalNode> nonEquiThetaJoinQuals) {
    SelectionNode selectionNode = plan.createNode(SelectionNode.class);
    selectionNode.setInSchema(joinNode.getOutSchema());
    selectionNode.setOutSchema(joinNode.getOutSchema());
    selectionNode.setQual(AlgebraicUtil.createSingletonExprFromCNF(nonEquiThetaJoinQuals));
    block.registerNode(selectionNode);

    LogicalNode parent = stack.peek();
    if (parent instanceof UnaryNode) {
      ((UnaryNode) parent).setChild(selectionNode);
    } else if (parent instanceof BinaryNode) {
      BinaryNode binaryParent = (BinaryNode) parent;
      if (binaryParent.getLeftChild().getPID() == joinNode.getPID()) {
        binaryParent.setLeftChild(selectionNode);
      } else if (binaryParent.getRightChild().getPID() == joinNode.getPID()) {
        binaryParent.setRightChild(selectionNode);
      }
    } else if (parent instanceof TableSubQueryNode) {
      ((TableSubQueryNode) parent).setSubQuery(selectionNode);
    }

    selectionNode.setChild(joinNode);
    return selectionNode;
  }

  private static Set<EvalNode> extractNonPushableJoinQuals(final LogicalPlan plan,
                                                           final LogicalPlan.QueryBlock block,
                                                           final JoinNode joinNode,
                                                           final Set<EvalNode> onPredicates,
                                                           final Set<EvalNode> wherePredicates)
      throws TajoException {
    Set<EvalNode> nonPushableQuals = new HashSet<>();
    // TODO: non-equi theta join quals must not be pushed until TAJO-742 is resolved.
    nonPushableQuals.addAll(extractNonEquiThetaJoinQuals(wherePredicates, block, joinNode));
    nonPushableQuals.addAll(extractNonEquiThetaJoinQuals(onPredicates, block, joinNode));

    // for outer joins
    if (PlannerUtil.isOuterJoinType(joinNode.getJoinType())) {
      nonPushableQuals.addAll(extractNonPushableOuterJoinQuals(plan, onPredicates, wherePredicates, joinNode));
    }
    return nonPushableQuals;
  }

  /**
   * For outer joins, pushable predicates can be decided based on their locations in the SQL and types of referencing
   * relations.
   *
   * <h3>Table types</h3>
   * <ul>
   *   <li>Preserved Row table : The preserved row table refers to the table that preserves rows when there is no match
   *   in the join operation. Therefore, all rows from the preserved row table that qualify against the WHERE clause
   *   will be returned, regardless of whether there is a matched row in the join. For a left/right table, the preserved
   *   row table is the left/right table. For a full outer join, both tables are preserved row tables.</li>
   *   <li>Null Supplying table : The NULL-supplying table supplies NULLs when there is an unmatched row. Any column
   *   from the NULL-supplying table referred to in the SELECT list or subsequent WHERE or ON clause will contain NULL
   *   if there was no match in the join operation. For a left/right outer join, the NULL-supplying
   *   table is the right/left table. For a full outer join, both tables are NULL-supplying
   *   table. In a full outer join, both tables can preserve rows, and also can supply NULLs. This is significant,
   *   because there are rules that apply to purely preserved row tables that do not apply if the table can also supply
   *   NULLs.</li>
   * </ul>
   *
   * <h3>Predicate types</h3>
   * <ul>
   *   <li>During Join predicate : A predicate that is in the JOIN ON clause.</li>
   *   <li>After Join predicate : A predicate that is in the WHERE clause.</li>
   * </ul>
   *
   * <h3>Predicate Pushdown Rules</h3>
   * <ol>
   *   <li>During Join predicates cannot be pushed past Preserved Row tables.</li>
   *   <li>After Join predicates cannot be pushed past Null Supplying tables.</li>
   * </ol>
   */
  private static Set<EvalNode> extractNonPushableOuterJoinQuals(final LogicalPlan plan,
                                                                final Set<EvalNode> onPredicates,
                                                                final Set<EvalNode> wherePredicates,
                                                                final JoinNode joinNode) throws TajoException {
    Set<String> nullSupplyingTableNameSet = new HashSet<>();
    Set<String> preservedTableNameSet = new HashSet<>();
    String leftRelation = PlannerUtil.getTopRelationInLineage(plan, joinNode.getLeftChild());
    String rightRelation = PlannerUtil.getTopRelationInLineage(plan, joinNode.getRightChild());

    if (joinNode.getJoinType() == JoinType.LEFT_OUTER) {
      nullSupplyingTableNameSet.add(rightRelation);
      preservedTableNameSet.add(leftRelation);
    } else if (joinNode.getJoinType() == JoinType.RIGHT_OUTER) {
      nullSupplyingTableNameSet.add(leftRelation);
      preservedTableNameSet.add(rightRelation);
    } else {
      // full outer join
      preservedTableNameSet.add(leftRelation);
      preservedTableNameSet.add(rightRelation);
      nullSupplyingTableNameSet.add(leftRelation);
      nullSupplyingTableNameSet.add(rightRelation);
    }

    Set<EvalNode> nonPushableQuals = new HashSet<>();
    for (EvalNode eachQual : onPredicates) {
      for (String relName : preservedTableNameSet) {
        if (isEvalNeedRelation(eachQual, relName)) {
          nonPushableQuals.add(eachQual);
        }
      }
    }

    for (EvalNode eachQual : wherePredicates) {
      for (String relName : nullSupplyingTableNameSet) {
        if (isEvalNeedRelation(eachQual, relName)) {
          nonPushableQuals.add(eachQual);
        }
      }
    }

    return nonPushableQuals;
  }

  private static boolean isEvalNeedRelation(final EvalNode evalNode, final String relationName) {
    Set<Column> columns = EvalTreeUtil.findUniqueColumns(evalNode);
    for (Column column : columns) {
      if (isColumnFromRelation(column, relationName)) {
        return true;
      }
    }
    return false;
  }

  private static boolean isColumnFromRelation(final Column column, final String relationName) {
    if (relationName.equals(column.getQualifier())) {
      return true;
    }
    return false;
  }

  private static boolean isNonEquiThetaJoinQual(final LogicalPlan.QueryBlock block,
                                                final JoinNode joinNode,
                                                final EvalNode evalNode) {
    if (EvalTreeUtil.isJoinQual(block, joinNode.getLeftChild().getOutSchema(),
        joinNode.getRightChild().getOutSchema(), evalNode, true) &&
        evalNode.getType() != EvalType.EQUAL) {
      return true;
    } else {
      return false;
    }
  }

  private static List<EvalNode> extractNonEquiThetaJoinQuals(final Set<EvalNode> predicates,
                                                             final LogicalPlan.QueryBlock block,
                                                             final JoinNode joinNode) {
    List<EvalNode> nonEquiThetaJoinQuals = TUtil.newList();
    for (EvalNode eachEval: predicates) {
      if (isNonEquiThetaJoinQual(block, joinNode, eachEval)) {
        nonEquiThetaJoinQuals.add(eachEval);
      }
    }
    return nonEquiThetaJoinQuals;
  }

  private Map<EvalNode, EvalNode> transformEvalsWidthByPassNode(
      Collection<EvalNode> originEvals, LogicalPlan plan,
      LogicalPlan.QueryBlock block,
      LogicalNode node, LogicalNode childNode) throws TajoException {
    // transformed -> pushingDownFilters
    Map<EvalNode, EvalNode> transformedMap = new HashMap<>();

    if (originEvals.isEmpty()) {
      return transformedMap;
    }

    if (node.getType() == NodeType.UNION) {
      // If node is union, All eval's columns are simple name and matched with child's output schema.
      Schema childOutSchema = childNode.getOutSchema();
      for (EvalNode eval : originEvals) {
        EvalNode copy;
        try {
          copy = (EvalNode) eval.clone();
        } catch (CloneNotSupportedException e) {
          throw new TajoInternalError(e);
        }

        Set<Column> columns = EvalTreeUtil.findUniqueColumns(copy);
        for (Column c : columns) {
          Column column = childOutSchema.getColumn(c.getSimpleName());
          if (column == null) {
            throw new TajoInternalError(
                "Invalid Filter PushDown on SubQuery: No such a corresponding column '"
                    + c.getQualifiedName() + " for FilterPushDown(" + eval + "), " +
                    "(PID=" + node.getPID() + ", Child=" + childNode.getPID() + ")");
          }
          EvalTreeUtil.changeColumnRef(copy, c.getSimpleName(), column.getQualifiedName());
        }

        transformedMap.put(copy, eval);
      }
      return transformedMap;
    }

    if (childNode.getType() == NodeType.UNION) {
      // If child is union, remove qualifier from eval's column
      for (EvalNode eval : originEvals) {
        EvalNode copy;
        try {
          copy = (EvalNode) eval.clone();
        } catch (CloneNotSupportedException e) {
          throw new TajoInternalError(e);
        }

        Set<Column> columns = EvalTreeUtil.findUniqueColumns(copy);
        for (Column c : columns) {
          if (c.hasQualifier()) {
            EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(), c.getSimpleName());
          }
        }

        transformedMap.put(copy, eval);
      }

      return transformedMap;
    }

    // node in column -> child out column
    Map<String, String> columnMap = new HashMap<>();

    for (int i = 0; i < node.getInSchema().size(); i++) {
      String inColumnName = node.getInSchema().getColumn(i).getQualifiedName();
      Column childOutColumn = childNode.getOutSchema().getColumn(i);
      columnMap.put(inColumnName, childOutColumn.getQualifiedName());
    }

    // Rename from upper block's one to lower block's one
    for (EvalNode matchedEval : originEvals) {
      EvalNode copy;
      try {
        copy = (EvalNode) matchedEval.clone();
      } catch (CloneNotSupportedException e) {
        throw new TajoInternalError(e);
      }

      Set<Column> columns = EvalTreeUtil.findUniqueColumns(copy);
      boolean allMatched = true;
      for (Column c : columns) {
        if (columnMap.containsKey(c.getQualifiedName())) {
          EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(), columnMap.get(c.getQualifiedName()));
        } else {
          if (childNode.getType() == NodeType.GROUP_BY) {
            if (((GroupbyNode) childNode).isAggregationColumn(c.getSimpleName())) {
              allMatched = false;
              break;
            }
          } else {
            throw new TajoInternalError(
                "Invalid Filter PushDown on SubQuery: No such a corresponding column '"
                    + c.getQualifiedName() + " for FilterPushDown(" + matchedEval + "), " +
                    "(PID=" + node.getPID() + ", Child=" + childNode.getPID() + ")"
            );
          }
        }
      }
      if (allMatched) {
        transformedMap.put(copy, matchedEval);
      }
    }

    return transformedMap;
  }

  @Override
  public LogicalNode visitTableSubQuery(FilterPushDownContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                        TableSubQueryNode node, Stack<LogicalNode> stack) throws TajoException {
    List<EvalNode> matched = TUtil.newList();
    List<EvalNode> unmatched = TUtil.newList();
    for (EvalNode eval : context.pushingDownFilters) {
      if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, eval, node)) {
        matched.add(eval);
      } else {
        unmatched.add(eval);
      }
    }

    // transformed -> pushingDownFilters
    Map<EvalNode, EvalNode> transformedMap =
        transformEvalsWidthByPassNode(matched, plan, block, node, node.getSubQuery());
    context.setFiltersTobePushed(transformedMap.keySet());
    visit(context, plan, plan.getBlock(node.getSubQuery()));
    context.setToOrigin(transformedMap);
    context.addFiltersTobePushed(unmatched);

    return node;
  }

  @Override
  public LogicalNode visitUnion(FilterPushDownContext context, LogicalPlan plan,
                                LogicalPlan.QueryBlock block, UnionNode unionNode,
                                Stack<LogicalNode> stack) throws TajoException {
    LogicalNode leftNode = unionNode.getLeftChild();

    List<EvalNode> origins = TUtil.newList(context.pushingDownFilters);

    // transformed -> pushingDownFilters
    Map<EvalNode, EvalNode> transformedMap = transformEvalsWidthByPassNode(origins, plan, block, unionNode, leftNode);
    context.setFiltersTobePushed(transformedMap.keySet());
    visit(context, plan, plan.getBlock(leftNode));

    if (!context.pushingDownFilters.isEmpty()) {
      errorFilterPushDown(plan, leftNode, context);
    }

    LogicalNode rightNode = unionNode.getRightChild();
    transformedMap = transformEvalsWidthByPassNode(origins, plan, block, unionNode, rightNode);
    context.setFiltersTobePushed(transformedMap.keySet());
    visit(context, plan, plan.getBlock(rightNode), rightNode, stack);

    if (!context.pushingDownFilters.isEmpty()) {
      errorFilterPushDown(plan, rightNode, context);
    }

    // notify all filter matched to upper
    context.pushingDownFilters.clear();
    return unionNode;
  }

  @Override
  public LogicalNode visitProjection(FilterPushDownContext context,
                                     LogicalPlan plan,
                                     LogicalPlan.QueryBlock block,
                                     ProjectionNode projectionNode,
                                     Stack<LogicalNode> stack) throws TajoException {
    LogicalNode childNode = projectionNode.getChild();

    List<EvalNode> notMatched = TUtil.newList();

    //copy -> origin
    BiMap<EvalNode, EvalNode> transformedMap = findCanPushdownAndTransform(
        context, block,projectionNode, childNode, notMatched, null, 0);

    context.setFiltersTobePushed(transformedMap.keySet());

    stack.push(projectionNode);
    visit(context, plan, plan.getBlock(childNode), childNode, stack);
    stack.pop();

    // find not matched after visiting child
    for (EvalNode eval: context.pushingDownFilters) {
      notMatched.add(transformedMap.get(eval));
    }

    EvalNode qual = null;
    if (notMatched.size() > 1) {
      // merged into one eval tree
      qual = AlgebraicUtil.createSingletonExprFromCNF(notMatched.toArray(new EvalNode[notMatched.size()]));
    } else if (notMatched.size() == 1) {
      // if the number of matched expr is one
      qual = notMatched.get(0);
    }

    // If there is not matched node add SelectionNode and clear context.pushingDownFilters
    if (qual != null && LogicalPlanner.checkIfBeEvaluatedAtThis(qual, projectionNode)) {
      SelectionNode selectionNode = plan.createNode(SelectionNode.class);
      selectionNode.setInSchema(childNode.getOutSchema());
      selectionNode.setOutSchema(childNode.getOutSchema());
      selectionNode.setQual(qual);
      block.registerNode(selectionNode);

      projectionNode.setChild(selectionNode);
      selectionNode.setChild(childNode);

      // clean all remain filters because all conditions are merged into a qual
      context.pushingDownFilters.clear();
    }

    // if there are remain filters, recover the original names and give back to the upper query block.
    if (context.pushingDownFilters.size() > 0) {
      ImmutableSet<EvalNode> copy = ImmutableSet.copyOf(context.pushingDownFilters);
      context.pushingDownFilters.clear();
      context.pushingDownFilters.addAll(reverseTransform(transformedMap, copy));
    }

    return projectionNode;
  }

  private Collection<EvalNode> reverseTransform(BiMap<EvalNode, EvalNode> map, Set<EvalNode> remainFilters) {
    Set<EvalNode> reversed = new HashSet<>();
    for (EvalNode evalNode : remainFilters) {
      reversed.add(map.get(evalNode));
    }
    return reversed;
  }

  private BiMap<EvalNode, EvalNode> findCanPushdownAndTransform(
      FilterPushDownContext context, LogicalPlan.QueryBlock block, Projectable node,
      LogicalNode childNode, List<EvalNode> notMatched,
      Set<String> partitionColumns, int columnOffset) throws TajoException {
    // canonical name -> target
    Map<String, Target> nodeTargetMap = new HashMap<>();
    for (Target target : node.getTargets()) {
      nodeTargetMap.put(target.getCanonicalName(), target);
    }

    // copy -> origin
    BiMap<EvalNode, EvalNode> matched = HashBiMap.create();

    for (EvalNode eval : context.pushingDownFilters) {
      // If all column is field eval, can push down.
      Set<Column> evalColumns = EvalTreeUtil.findUniqueColumns(eval);
      boolean columnMatched = true;
      for (Column c : evalColumns) {
        Target target = nodeTargetMap.get(c.getQualifiedName());
        if (target == null) {
          columnMatched = false;
          break;
        }
        if (target.getEvalTree().getType() != EvalType.FIELD) {
          columnMatched = false;
          break;
        }
      }

      if (columnMatched) {
        // transform eval column to child's output column
        EvalNode copyEvalNode = transformEval(node, childNode, eval, nodeTargetMap, partitionColumns, columnOffset);
        if (copyEvalNode != null) {
          matched.put(copyEvalNode, eval);
        } else {
          notMatched.add(eval);
        }
      } else {
        notMatched.add(eval);
      }
    }

    return matched;
  }

  private EvalNode transformEval(Projectable node, LogicalNode childNode, EvalNode origin,
                                 Map<String, Target> targetMap, Set<String> partitionColumns,
                                 int columnOffset) throws TajoException {
    Schema outputSchema = childNode != null ? childNode.getOutSchema() : node.getInSchema();
    EvalNode copy;
    try {
      copy = (EvalNode) origin.clone();
    } catch (CloneNotSupportedException e) {
      throw new TajoInternalError(e);
    }
    Set<Column> columns = EvalTreeUtil.findUniqueColumns(copy);
    for (Column c: columns) {
      Target target = targetMap.get(c.getQualifiedName());
      if (target == null) {
        throw new TajoInternalError(
            "Invalid Filter PushDown: No such a corresponding target '"
                + c.getQualifiedName() + " for FilterPushDown(" + origin + "), " +
                "(PID=" + node.getPID() + ")"
        );
      }
      EvalNode targetEvalNode = target.getEvalTree();
      if (targetEvalNode.getType() != EvalType.FIELD) {
        throw new TajoInternalError(
            "Invalid Filter PushDown: '" + c.getQualifiedName() + "' target is not FieldEval " +
                "(PID=" + node.getPID() + ")"
        );
      }

      FieldEval fieldEval = (FieldEval)targetEvalNode;
      Column targetInputColumn = fieldEval.getColumnRef();

      int index;
      if (targetInputColumn.hasQualifier()) {
        index = node.getInSchema().getColumnId(targetInputColumn.getQualifiedName());
      } else {
        index = node.getInSchema().getColumnIdByName(targetInputColumn.getQualifiedName());
      }
      if (columnOffset > 0) {
        index = index - columnOffset;
      }
      if (index < 0 || index >= outputSchema.size()) {
        if (partitionColumns != null && !partitionColumns.isEmpty() && node instanceof ScanNode) {
          ScanNode scanNode = (ScanNode)node;
          boolean isPartitionColumn = false;
          if (CatalogUtil.isFQColumnName(partitionColumns.iterator().next())) {
            isPartitionColumn = partitionColumns.contains(
                CatalogUtil.buildFQName(scanNode.getTableName(), c.getSimpleName()));
          } else {
            isPartitionColumn = partitionColumns.contains(c.getSimpleName());
          }
          if (isPartitionColumn) {
            EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(),
                scanNode.getCanonicalName() + "." + c.getSimpleName());
          } else {
            return null;
          }
        } else {
          return null;
        }
      } else {
        Column outputColumn = outputSchema.getColumn(index);
        EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(), outputColumn.getQualifiedName());
      }
    }

    return copy;
  }

  /**
   * Find aggregation columns in filter eval and add having clause or add HavingNode.
   * @param context
   * @param plan
   * @param block
   * @param parentNode  If null, having is parent
   * @param havingNode      If null, projection is parent
   * @param groupByNode
   * @return matched origin eval
   * @throws TajoException
   */
  private List<EvalNode> addHavingNode(FilterPushDownContext context, LogicalPlan plan,
                                       LogicalPlan.QueryBlock block,
                                       UnaryNode parentNode,
                                       HavingNode havingNode,
                                       GroupbyNode groupByNode) throws TajoException {
    // find aggregation column
    Set<Column> groupingColumns = new HashSet<>(Arrays.asList(groupByNode.getGroupingColumns()));
    Set<String> aggrFunctionOutColumns = new HashSet<>();
    for (Column column : groupByNode.getOutSchema().getRootColumns()) {
      if (!groupingColumns.contains(column)) {
        aggrFunctionOutColumns.add(column.getQualifiedName());
      }
    }

    List<EvalNode> aggrEvalOrigins = TUtil.newList();
    List<EvalNode> aggrEvals = TUtil.newList();

    for (EvalNode eval : context.pushingDownFilters) {
      EvalNode copy = null;
      try {
        copy = (EvalNode)eval.clone();
      } catch (CloneNotSupportedException e) {
      }
      boolean isEvalAggrFunction = false;
      for (Column evalColumn : EvalTreeUtil.findUniqueColumns(copy)) {
        if (aggrFunctionOutColumns.contains(evalColumn.getSimpleName())) {
          EvalTreeUtil.changeColumnRef(copy, evalColumn.getQualifiedName(), evalColumn.getSimpleName());
          isEvalAggrFunction = true;
          break;
        }
      }
      if (groupByNode.getInSchema().containsAll(EvalTreeUtil.findUniqueColumns(copy))) {
        isEvalAggrFunction = true;
      }
      if (isEvalAggrFunction) {
        aggrEvals.add(copy);
        aggrEvalOrigins.add(eval);
      }
    }

    if (aggrEvals.isEmpty()) {
      return aggrEvalOrigins;
    }

    // transform
    HavingNode workingHavingNode;
    if (havingNode != null) {
      workingHavingNode = havingNode;
      aggrEvals.add(havingNode.getQual());
    } else {
      workingHavingNode = plan.createNode(HavingNode.class);
      block.registerNode(workingHavingNode);
      parentNode.setChild(workingHavingNode);
      workingHavingNode.setChild(groupByNode);
    }

    EvalNode qual = null;
    if (aggrEvals.size() > 1) {
      // merged into one eval tree
      qual = AlgebraicUtil.createSingletonExprFromCNF(aggrEvals.toArray(new EvalNode[aggrEvals.size()]));
    } else if (aggrEvals.size() == 1) {
      // if the number of matched expr is one
      qual = aggrEvals.get(0);
    }

    // If there is not matched node add SelectionNode and clear context.pushingDownFilters
    if (qual != null) {
      workingHavingNode.setQual(qual);
    }

    return aggrEvalOrigins;
  }

  @Override
  public LogicalNode visitWindowAgg(FilterPushDownContext context, LogicalPlan plan,
                                  LogicalPlan.QueryBlock block, WindowAggNode winAggNode,
                                  Stack<LogicalNode> stack) throws TajoException {
    stack.push(winAggNode);
    super.visitWindowAgg(context, plan, block, winAggNode, stack);
    stack.pop();
    return winAggNode;
  }


  @Override
  public LogicalNode visitGroupBy(FilterPushDownContext context, LogicalPlan plan,
                                  LogicalPlan.QueryBlock block, GroupbyNode groupbyNode,
                                  Stack<LogicalNode> stack) throws TajoException {
    LogicalNode parentNode = stack.peek();
    List<EvalNode> aggrEvals;
    if (parentNode.getType() == NodeType.HAVING) {
      aggrEvals = addHavingNode(context, plan, block, null, (HavingNode)parentNode, groupbyNode);
    } else {
      aggrEvals = addHavingNode(context, plan, block, (UnaryNode)parentNode, null, groupbyNode);
    }

    if (aggrEvals != null) {
      // remove aggregation eval from conext
      context.pushingDownFilters.removeAll(aggrEvals);
    }

    List<EvalNode> notMatched = TUtil.newList();
    // transform
    Map<EvalNode, EvalNode> transformed =
        findCanPushdownAndTransform(context, block, groupbyNode,groupbyNode.getChild(), notMatched, null, 0);

    context.setFiltersTobePushed(transformed.keySet());
    LogicalNode current = super.visitGroupBy(context, plan, block, groupbyNode, stack);

    context.setToOrigin(transformed);
    context.addFiltersTobePushed(notMatched);

    return current;
  }

  @Override
  public LogicalNode visitScan(FilterPushDownContext context, LogicalPlan plan,
                               LogicalPlan.QueryBlock block, final ScanNode scanNode,
                               Stack<LogicalNode> stack) throws TajoException {
    List<EvalNode> matched = TUtil.newList();

    // find partition column and check matching
    Set<String> partitionColumns = new HashSet<>();
    TableDesc table = scanNode.getTableDesc();
    boolean hasQualifiedName = false;
    if (table.hasPartition()) {
      for (Column c: table.getPartitionMethod().getExpressionSchema().getRootColumns()) {
        partitionColumns.add(c.getQualifiedName());
        hasQualifiedName = c.hasQualifier();
      }
    }
    Set<EvalNode> partitionEvals = new HashSet<>();
    for (EvalNode eval : context.pushingDownFilters) {
      if (table.hasPartition()) {
        Set<Column> columns = EvalTreeUtil.findUniqueColumns(eval);
        if (columns.size() != 1) {
          continue;
        }
        Column column = columns.iterator().next();

        // If catalog runs with HiveCatalogStore, partition column is a qualified name
        // Else partition column is a simple name
        boolean isPartitionColumn = false;
        if (hasQualifiedName) {
          isPartitionColumn = partitionColumns.contains(CatalogUtil.buildFQName(table.getName(), column.getSimpleName()));
        } else {
          isPartitionColumn = partitionColumns.contains(column.getSimpleName());
        }
        if (isPartitionColumn) {
          EvalNode copy;
          try {
            copy = (EvalNode) eval.clone();
          } catch (CloneNotSupportedException e) {
            throw new TajoInternalError(e);
          }
          EvalTreeUtil.changeColumnRef(copy, column.getQualifiedName(),
              scanNode.getCanonicalName() + "." + column.getSimpleName());
          matched.add(copy);
          partitionEvals.add(eval);
        }
      }
    }

    context.pushingDownFilters.removeAll(partitionEvals);

    List<EvalNode> notMatched = TUtil.newList();

    // transform
    Map<EvalNode, EvalNode> transformed =
        findCanPushdownAndTransform(context, block, scanNode, null, notMatched, partitionColumns, 0);

    for (EvalNode eval : transformed.keySet()) {
      if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, eval, scanNode)) {
        matched.add(eval);
      }
    }

    EvalNode qual = null;
    if (matched.size() > 1) {
      // merged into one eval tree
      qual = AlgebraicUtil.createSingletonExprFromCNF(
          matched.toArray(new EvalNode[matched.size()]));
    } else if (matched.size() == 1) {
      // if the number of matched expr is one
      qual = matched.iterator().next();
    }

    block.addAccessPath(scanNode, new SeqScanInfo(table));
    if (qual != null) { // if a matched qual exists
      scanNode.setQual(qual);

      // Index path can be identified only after filters are pushed into each scan.
      if(context.rewriteRuleContext.getQueryContext().getBool(SessionVars.INDEX_ENABLED)) {
        String databaseName, tableName;
        databaseName = CatalogUtil.extractQualifier(table.getName());
        tableName = CatalogUtil.extractSimpleName(table.getName());
        Set<Predicate> predicates = new HashSet<>();
        for (EvalNode eval : PlannerUtil.getAllEqualEvals(qual)) {
          BinaryEval binaryEval = (BinaryEval) eval;
          // TODO: consider more complex predicates
          if (binaryEval.getLeftExpr().getType() == EvalType.FIELD &&
              binaryEval.getRightExpr().getType() == EvalType.CONST) {
            predicates.add(new Predicate(binaryEval.getType(),
                ((FieldEval) binaryEval.getLeftExpr()).getColumnRef(),
                ((ConstEval) binaryEval.getRightExpr()).getValue()));
          } else if (binaryEval.getLeftExpr().getType() == EvalType.CONST &&
              binaryEval.getRightExpr().getType() == EvalType.FIELD) {
            predicates.add(new Predicate(binaryEval.getType(),
                ((FieldEval) binaryEval.getRightExpr()).getColumnRef(),
                ((ConstEval) binaryEval.getLeftExpr()).getValue()));
          }
        }

        // for every subset of the set of columns, find all matched index paths
        for (Set<Predicate> subset : Sets.powerSet(predicates)) {
          if (subset.size() == 0)
            continue;
          Column[] columns = extractColumns(subset);
          if (catalog.existIndexByColumns(databaseName, tableName, columns)) {
            IndexDesc indexDesc = catalog.getIndexByColumns(databaseName, tableName, columns);
            block.addAccessPath(scanNode, new IndexScanInfo(
                table.getStats(), indexDesc, getSimplePredicates(indexDesc, subset)));
          }
        }
      }
    }

    for (EvalNode matchedEval: matched) {
      transformed.remove(matchedEval);
    }

    context.setToOrigin(transformed);
    context.addFiltersTobePushed(notMatched);

    return scanNode;
  }

  private static class Predicate {
    Column column;
    Datum value;
    EvalType evalType;

    public Predicate(EvalType evalType, Column column, Datum value) {
      this.evalType = evalType;
      this.column = column;
      this.value = value;
    }
  }

  private static SimplePredicate[] getSimplePredicates(IndexDesc desc, Set<Predicate> predicates) {
    SimplePredicate[] simplePredicates = new SimplePredicate[predicates.size()];
    Map<Column, Datum> colToValue = new HashMap<>();
    for (Predicate predicate : predicates) {
      colToValue.put(predicate.column, predicate.value);
    }
    SortSpec [] keySortSpecs = desc.getKeySortSpecs();
    for (int i = 0; i < keySortSpecs.length; i++) {
      simplePredicates[i] = new SimplePredicate(keySortSpecs[i],
          colToValue.get(keySortSpecs[i].getSortKey()));
    }
    return simplePredicates;
  }

  private static Column[] extractColumns(Set<Predicate> predicates) {
    Column[] columns = new Column[predicates.size()];
    int i = 0;
    for (Predicate p : predicates) {
      columns[i++] = p.column;
    }
    return columns;
  }

  private void errorFilterPushDown(LogicalPlan plan, LogicalNode node,
                                   FilterPushDownContext context) {
    String prefix = "";
    StringBuilder notMatchedNodeStrBuilder = new StringBuilder();
    for (EvalNode notMatchedNode: context.pushingDownFilters) {
      notMatchedNodeStrBuilder.append(prefix).append(notMatchedNode.toString());
      if (prefix.isEmpty()) {
        prefix = ", ";
      }
    }
    throw new TajoInternalError("FilterPushDown failed cause some filters not matched: "
        + notMatchedNodeStrBuilder.toString() + "\n" +
        "Error node: " + node.getPlanString() + "\n" + plan.toString());
  }
}
