/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.tajo.engine.planner.physical;

import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SchemaUtil;
import org.apache.tajo.engine.planner.Projector;
import org.apache.tajo.plan.expr.AlgebraicUtil;
import org.apache.tajo.plan.expr.BinaryEval;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.expr.EvalTreeUtil;
import org.apache.tajo.plan.logical.JoinNode;
import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.NullTuple;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.worker.TaskAttemptContext;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;

/**
 * common exec for all join execs
 */
public abstract class CommonJoinExec extends BinaryPhysicalExec {

  // from logical plan
  protected JoinNode plan;
  protected final boolean hasJoinQual;

  protected EvalNode joinQual;         // ex) a.id = b.id
  protected EvalNode leftJoinFilter;   // ex) a > 10
  protected EvalNode rightJoinFilter;  // ex) b > 5

  protected final Schema leftSchema;
  protected final Schema rightSchema;

  protected final FrameTuple frameTuple;

  // projection
  protected Projector projector;

  public CommonJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner) {
    super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
        plan.getOutSchema(), outer, inner);
    this.plan = plan;
    this.leftSchema = outer.getSchema();
    this.rightSchema = inner.getSchema();
    if (plan.hasJoinQual()) {
      EvalNode[] extracted = extractJoinConditions(plan.getJoinQual(), leftSchema, rightSchema);
      joinQual = extracted[0];
      leftJoinFilter = extracted[1];
      rightJoinFilter = extracted[2];
    }
    this.hasJoinQual = joinQual != null;

    // for projection
    this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());

    // for join
    this.frameTuple = new FrameTuple();
  }

  /**
   * It separates a singular CNF-formed join condition into a join condition, a left join filter, and
   * right join filter.
   *
   * @param joinQual the original join condition
   * @param leftSchema Left table schema
   * @param rightSchema Left table schema
   * @return Three element EvalNodes, 0 - join condition, 1 - left join filter, 2 - right join filter.
   */
  private EvalNode[] extractJoinConditions(EvalNode joinQual, Schema leftSchema, Schema rightSchema) {
    List<EvalNode> joinQuals = Lists.newArrayList();
    List<EvalNode> leftFilters = Lists.newArrayList();
    List<EvalNode> rightFilters = Lists.newArrayList();
    for (EvalNode eachQual : AlgebraicUtil.toConjunctiveNormalFormArray(joinQual)) {
      if (!(eachQual instanceof BinaryEval)) {
        continue; // todo 'between', etc.
      }
      BinaryEval binaryEval = (BinaryEval)eachQual;
      LinkedHashSet<Column> leftColumns = EvalTreeUtil.findUniqueColumns(binaryEval.getLeftExpr());
      LinkedHashSet<Column> rightColumns = EvalTreeUtil.findUniqueColumns(binaryEval.getRightExpr());
      boolean leftInLeft = leftSchema.containsAny(leftColumns);
      boolean rightInLeft = leftSchema.containsAny(rightColumns);
      boolean leftInRight = rightSchema.containsAny(leftColumns);
      boolean rightInRight = rightSchema.containsAny(rightColumns);

      boolean columnsFromLeft = leftInLeft || rightInLeft;
      boolean columnsFromRight = leftInRight || rightInRight;
      if (!columnsFromLeft && !columnsFromRight) {
        continue; // todo constant expression : this should be done in logical phase
      }
      if (columnsFromLeft ^ columnsFromRight) {
        if (columnsFromLeft) {
          leftFilters.add(eachQual);
        } else {
          rightFilters.add(eachQual);
        }
        continue;
      }
      if ((leftInLeft && rightInLeft) || (leftInRight && rightInRight)) {
        continue; // todo not allowed yet : this should be checked in logical phase
      }
      joinQuals.add(eachQual);
    }
    return new EvalNode[] {
        joinQuals.isEmpty() ? null : AlgebraicUtil.createSingletonExprFromCNF(joinQuals),
        leftFilters.isEmpty() ? null : AlgebraicUtil.createSingletonExprFromCNF(leftFilters),
        rightFilters.isEmpty() ? null : AlgebraicUtil.createSingletonExprFromCNF(rightFilters)
    };
  }

  public JoinNode getPlan() {
    return plan;
  }

  /**
   * Evaluate an input tuple with a left join filter
   *
   * @param left Tuple to be evaluated
   * @return True if an input tuple is matched to the left join filter
   */
  protected boolean leftFiltered(Tuple left) {
    return leftJoinFilter != null && !leftJoinFilter.eval(left).asBool();
  }

  /**
   * Evaluate an input tuple with a right join filter
   *
   * @param right Tuple to be evaluated
   * @return True if an input tuple is matched to the right join filter
   */
  protected boolean rightFiltered(Tuple right) {
    return rightJoinFilter != null && !rightJoinFilter.eval(right).asBool();
  }

  /**
   * Return an tuple iterator filters rows in a right table by using a join filter.
   * It must takes rows of a right table.
   *
   * @param rightTuples Tuple iterator
   * @return rows Filtered by a join filter on right table.
   */
  protected Iterator<Tuple> rightFiltered(Iterable<Tuple> rightTuples) {
    if (rightTuples == null) {
      return Iterators.emptyIterator();
    }
    if (rightJoinFilter == null) {
      return rightTuples.iterator();
    }
    return Iterators.filter(rightTuples.iterator(), new Predicate<Tuple>() {
      @Override
      public boolean apply(Tuple input) {
        return rightJoinFilter.eval(input).asBool();
      }
    });
  }

  /**
   * Create a list that contains a single null tuple.
   *
   * @param width the width of null tuple which will be created.
   * @return created list of a null tuple
   */
  protected List<Tuple> nullTupleList(int width) {
    return Arrays.asList(NullTuple.create(width));
  }

  @Override
  public void init() throws IOException {
    super.init();
    if (hasJoinQual) {
      joinQual.bind(context.getEvalContext(), inSchema);
    }
    if (leftJoinFilter != null) {
      leftJoinFilter.bind(context.getEvalContext(), leftSchema);
    }
    if (rightJoinFilter != null) {
      rightJoinFilter.bind(context.getEvalContext(), rightSchema);
    }
  }

  @Override
  protected void compile() {
    if (hasJoinQual) {
      joinQual = context.getPrecompiledEval(inSchema, joinQual);
    }
    // compile filters?
  }

  @Override
  public void close() throws IOException {
    super.close();
    plan = null;
    joinQual = null;
    leftJoinFilter = null;
    rightJoinFilter = null;
    projector = null;
  }

  @Override
  public String toString() {
    return getClass().getSimpleName() + " [" + leftSchema + " : " + rightSchema + "]";
  }
}
