/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pig.backend.hadoop.executionengine.physicalLayer;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.parser.SourceLocation;
import org.apache.pig.pen.Illustrable;
import org.apache.pig.pen.Illustrator;
import org.apache.pig.pen.util.LineageTracer;

/**
 *
 * This is the base class for all operators. This supports a generic way of
 * processing inputs which can be overridden by operators extending this class.
 * The input model assumes that it can either be taken from an operator or can
 * be attached directly to this operator. Also it is assumed that inputs to an
 * operator are always in the form of a tuple.
 *
 * For this pipeline rework, we assume a pull based model, i.e, the root
 * operator is going to call getNext with the appropriate type which initiates a
 * cascade of getNext calls that unroll to create input for the root operator to
 * work on.
 *
 * Any operator that extends the PhysicalOperator, supports a getNext with all
 * the different types of parameter types. The concrete implementation should
 * use the result type of its input operator to decide the type of getNext's
 * parameter. This is done to avoid switch/case based on the type as much as
 * possible. The default is assumed to return an erroneus Result corresponding
 * to an unsupported operation on that type. So the operators need to implement
 * only those types that are supported.
 *
 */
public abstract class PhysicalOperator extends Operator<PhyPlanVisitor> implements Illustrable, Cloneable {
    private static final Log log = LogFactory.getLog(PhysicalOperator.class);

    protected static final long serialVersionUID = 1L;
    protected static final Result RESULT_EMPTY = new Result(POStatus.STATUS_NULL, null);
    protected static final Result RESULT_EOP = new Result(POStatus.STATUS_EOP, null);
    protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
    protected static final BagFactory mBagFactory = BagFactory.getInstance();

    // The degree of parallelism requested
    protected int requestedParallelism;

    // The inputs that this operator will read data from
    protected List<PhysicalOperator> inputs;

    // The outputs that this operator will write data to
    // Will be used to create Targeted tuples
    protected List<PhysicalOperator> outputs;

    // The data type for the results of this operator
    protected byte resultType = DataType.TUPLE;

    // The physical plan this operator is part of
    protected PhysicalPlan parentPlan;

    // Specifies if the input has been directly attached
    protected boolean inputAttached = false;

    // If inputAttached is true, input is set to the input tuple
    protected Tuple input = null;

    // The result of performing the operation along with the output
    protected Result res = null;


    // alias associated with this PhysicalOperator
    protected String alias = null;

    // Will be used by operators to report status or transmit heartbeat
    // Should be set by the backends to appropriate implementations that
    // wrap their own version of a reporter.
    protected static ThreadLocal<PigProgressable> reporter = new ThreadLocal<PigProgressable>();

    // Will be used by operators to aggregate warning messages
    // Should be set by the backends to appropriate implementations that
    // wrap their own version of a logger.
    protected static PigLogger pigLogger;

    // TODO: This is not needed. But a lot of tests check serialized physical plans
    // that are sensitive to the serialized image of the contained physical operators.
    // So for now, just keep it. Later it'll be cleansed along with those test golden
    // files
    protected LineageTracer lineageTracer;

    protected transient Illustrator illustrator = null;

    private boolean accum;
    private transient boolean accumStart;

    private List<OriginalLocation> originalLocations =  new ArrayList<OriginalLocation>();

    public PhysicalOperator(OperatorKey k) {
        this(k, -1, null);
    }

    public PhysicalOperator(OperatorKey k, int rp) {
        this(k, rp, null);
    }

    public PhysicalOperator(OperatorKey k, List<PhysicalOperator> inp) {
        this(k, -1, inp);
    }

    public PhysicalOperator(OperatorKey k, int rp, List<PhysicalOperator> inp) {
        super(k);
        requestedParallelism = rp;
        inputs = inp;
        res = new Result();
    }

    public PhysicalOperator(PhysicalOperator copy) {
        super (copy.getOperatorKey());
        this.res = new Result();
        this.requestedParallelism = copy.requestedParallelism;
        this.inputs = copy.inputs;
        this.outputs = copy.outputs;
        this.resultType = copy.resultType;
        this.parentPlan = copy.parentPlan;
        this.inputAttached = copy.inputAttached;
        this.alias = copy.alias;
        this.lineageTracer = copy.lineageTracer;
        this.accum = copy.accum;
        this.originalLocations = copy.originalLocations;
    }

    @Override
    public void setIllustrator(Illustrator illustrator) {
	      this.illustrator = illustrator;
    }

    public Illustrator getIllustrator() {
        return illustrator;
    }

    public int getRequestedParallelism() {
        return requestedParallelism;
    }

    public void setRequestedParallelism(int requestedParallelism) {
        this.requestedParallelism = requestedParallelism;
    }

    public byte getResultType() {
        return resultType;
    }

    public String getAlias() {
        return alias;
    }

    protected String getAliasString() {
        return (alias == null) ? "" : (alias + ": ");
    }

    public void copyAliasFrom(PhysicalOperator op) {
        this.alias = op.alias;
        this.originalLocations = op.originalLocations;
    }

    public void addOriginalLocation(String alias, SourceLocation sourceLocation) {
        this.alias = alias;
        this.originalLocations.add(new OriginalLocation(alias, sourceLocation.line(), sourceLocation.offset()));
    }

    public void addOriginalLocation(String alias, List<OriginalLocation> originalLocations) {
        this.alias = alias;
        this.originalLocations.addAll(originalLocations);
    }

    public List<OriginalLocation> getOriginalLocations() {
        return Collections.unmodifiableList(originalLocations);
    }

    public void setAccumulative() {
        accum = true;
    }

    public boolean isAccumulative() {
       return accum;
    }

    public void setAccumStart() {
       if (!accum) {
               throw new IllegalStateException("Accumulative is not turned on.");
       }
       accumStart = true;
    }

    public boolean isAccumStarted() {
    	return accumStart;
    }

    public void setAccumEnd() {
       if (!accum){
    	   throw new IllegalStateException("Accumulative is not turned on.");
       }
       accumStart = false;
    }

    public void setResultType(byte resultType) {
        this.resultType = resultType;
    }

    public List<PhysicalOperator> getInputs() {
        return inputs;
    }

    public void setInputs(List<PhysicalOperator> inputs) {
        this.inputs = inputs;
    }

    public boolean isInputAttached() {
        return inputAttached;
    }

    /**
     * Shorts the input path of this operator by providing the input tuple
     * directly
     *
     * @param t -
     *            The tuple that should be used as input
     */
    public void attachInput(Tuple t) {
        input = t;
        this.inputAttached = true;
    }

    /**
     * Detaches any tuples that are attached
     *
     */
    public void detachInput() {
        input = null;
        this.inputAttached = false;
    }

    /**
     * A blocking operator should override this to return true. Blocking
     * operators are those that need the full bag before operate on the tuples
     * inside the bag. Example is the Global Rearrange. Non-blocking or pipeline
     * operators are those that work on a tuple by tuple basis.
     *
     * @return true if blocking and false otherwise
     */
    public boolean isBlocking() {
        return false;
    }

    /**
     * A generic method for parsing input that either returns the attached input
     * if it exists or fetches it from its predecessor. If special processing is
     * required, this method should be overridden.
     *
     * @return The Result object that results from processing the input
     * @throws ExecException
     */
    public Result processInput() throws ExecException {
        try {
            if (input == null && (inputs == null || inputs.size() == 0)) {
                // log.warn("No inputs found. Signaling End of Processing.");
                return RESULT_EOP;
            }

            // Should be removed once the model is clear
            PigProgressable progRep = getReporter();
            if (progRep != null) {
                progRep.progress();
            }

            if (!isInputAttached()) {
                return inputs.get(0).getNextTuple();
            } else {
                Result res = new Result();
                res.result = input;
                res.returnStatus = POStatus.STATUS_OK;
                detachInput();
                return res;
            }
        } catch (ExecException e) {
            throw new ExecException("Exception while executing "
                    + this.toString() + ": " + e.toString(), e);
        }
    }

    @Override
    public abstract void visit(PhyPlanVisitor v) throws VisitorException;

    /**
     * Implementations that call into the different versions of getNext are often
     * identical, differing only in the signature of the getNext() call they make.
     * This method allows to cut down on some of the copy-and-paste.
     * @param dataType Describes the type of obj; a byte from DataType.
     *
     * @return result Result of applying this Operator to the Object.
     * @throws ExecException
     */
    public Result getNext(byte dataType) throws ExecException {
        try {
            switch (dataType) {
            case DataType.BAG:
                return getNextDataBag();
            case DataType.BOOLEAN:
                return getNextBoolean();
            case DataType.BYTEARRAY:
                return getNextDataByteArray();
            case DataType.CHARARRAY:
                return getNextString();
            case DataType.DOUBLE:
                return getNextDouble();
            case DataType.FLOAT:
                return getNextFloat();
            case DataType.INTEGER:
                return getNextInteger();
            case DataType.LONG:
                return getNextLong();
            case DataType.BIGINTEGER:
                return getNextBigInteger();
            case DataType.BIGDECIMAL:
                return getNextBigDecimal();
            case DataType.DATETIME:
                return getNextDateTime();
            case DataType.MAP:
                return getNextMap();
            case DataType.TUPLE:
                return getNextTuple();
            default:
                throw new ExecException("Unsupported type for getNext: " + DataType.findTypeName(dataType));
            }
        } catch (ExecException e) {
            throw new ExecException("Exception while executing for " + originalLocationsToDescriptiveString()
                                    + ": " + e.toString(), e.getErrorCode(), e);
        } catch (RuntimeException e) {
            throw new ExecException("Exception while executing " + this.toString() + ": " + e.toString(), e);
        }
    }

    private String originalLocationsToDescriptiveString() {
        StringBuilder sb = new StringBuilder();
        sb.append('[');
        boolean isFirstItem = true;
        for( OriginalLocation ol : getOriginalLocations() ) {
            if( isFirstItem ) {
                isFirstItem = false;
            } else {
                sb.append(",");
            }
            sb.append(ol.getAlias()).append("[line=").append(ol.getLine())
              .append(",offset=").append(ol.getOffset()).append("]");
        }
        return sb.append(']').toString();
    }

    public Result getNextInteger() throws ExecException {
        return res;
    }

    public Result getNextLong() throws ExecException {
        return res;
    }

    public Result getNextDouble() throws ExecException {
        return res;
    }

    public Result getNextFloat() throws ExecException {
        return res;
    }

    public Result getNextDateTime() throws ExecException {
        return res;
    }

    public Result getNextString() throws ExecException {
        return res;
    }

    public Result getNextDataByteArray() throws ExecException {
        return res;
    }

    public Result getNextMap() throws ExecException {
        return res;
    }

    public Result getNextBoolean() throws ExecException {
        return res;
    }

    public Result getNextTuple() throws ExecException {
        return res;
    }

    public Result getNextDataBag() throws ExecException {
        Result val = new Result();
        DataBag tmpBag = mBagFactory.newDefaultBag();
        for (Result ret = getNextTuple(); ret.returnStatus != POStatus.STATUS_EOP; ret = getNextTuple()) {
            if (ret.returnStatus == POStatus.STATUS_ERR) {
                return ret;
            } else if (ret.returnStatus == POStatus.STATUS_NULL) {
                continue;
            } else {
                tmpBag.add((Tuple) ret.result);
            }
        }
        val.result = tmpBag;
        val.returnStatus = (tmpBag.size() == 0)? POStatus.STATUS_EOP : POStatus.STATUS_OK;
        return val;
    }

    public Result getNextBigInteger() throws ExecException {
        return res;
    }

    public Result getNextBigDecimal() throws ExecException {
        return res;
    }

    /**
     * Reset internal state in an operator.  For use in nested pipelines
     * where operators like limit and sort may need to reset their state.
     * Limit needs it because it needs to know it's seeing a fresh set of
     * input.  Blocking operators like sort and distinct need it because they
     * may not have drained their previous input due to a limit and thus need
     * to be told to drop their old input and start over.
     */
    public void reset() {
    }

    public boolean isEndOfAllInput() {
        return parentPlan.endOfAllInput;
    }

    /**
     * @return PigProgressable stored in threadlocal
     */
    public static PigProgressable getReporter() {
        return PhysicalOperator.reporter.get();
    }

    /**
     * @param reporter PigProgressable to be stored in threadlocal
     */
    public static void setReporter(PigProgressable reporter) {
        PhysicalOperator.reporter.set(reporter);
    }

    //@StaticDataCleanup
    public static void staticDataCleanup() {
        reporter = new ThreadLocal<PigProgressable>();
    }

    /**
     * Make a copy of this operator. This function is blank, however,
     * we should leave a place holder so that the subclasses can clone
     * to make deep copy as this one creates a shallow copy of
     * non-primitive types (objects, arrays and lists)
     *
     * @throws CloneNotSupportedException
     */
    @Override
    public PhysicalOperator clone() throws CloneNotSupportedException {
        return (PhysicalOperator)super.clone();
    }

    protected void cloneHelper(PhysicalOperator op) {
        resultType = op.resultType;
        originalLocations.addAll(op.originalLocations);
    }

    protected static List<PhysicalPlan> clonePlans(List<PhysicalPlan> origPlans) throws CloneNotSupportedException {
        List<PhysicalPlan> clonePlans = new ArrayList<PhysicalPlan>(origPlans.size());
        for (PhysicalPlan plan : origPlans) {
            clonePlans.add(plan.clone());
        }
        return clonePlans;
    }

    /**
     * @param physicalPlan
     */
    public void setParentPlan(PhysicalPlan physicalPlan) {
       parentPlan = physicalPlan;
    }

    public PhysicalPlan getParentPlan() {
        return parentPlan;
    }

    public Log getLogger() {
        return log;
    }

    public static void setPigLogger(PigLogger logger) {
        pigLogger = logger;
    }

    public static PigLogger getPigLogger() {
        return pigLogger;
    }

    public static class OriginalLocation implements Serializable {
        private String alias;
        private int line;
        private int offset;

        public OriginalLocation(String alias, int line, int offset) {
            super();
            this.alias = alias;
            this.line = line;
            this.offset = offset;
        }

        public String getAlias() {
            return alias;
        }

        public int getLine() {
            return line;
        }

        public int getOffset() {
            return offset;
        }

        @Override
        public String toString() {
            return alias+"["+line+","+offset+"]";
        }
    }
}
