/*
 * Copyright 2009-2010 by The Regents of the University of California
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * you may obtain a copy of the License from
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;

import java.util.LinkedList;
import java.util.List;

import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;

/**
 * Left input is broadcast and preserves its local properties.
 * Right input can be partitioned in any way.
 */
public class NLJoinPOperator extends AbstractJoinPOperator {

    private final int memSize;

    public NLJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, int memSize) {
        super(kind, partitioningType);
        this.memSize = memSize;
    }

    @Override
    public PhysicalOperatorTag getOperatorTag() {
        return PhysicalOperatorTag.NESTED_LOOP;
    }

    @Override
    public boolean isMicroOperator() {
        return false;
    }

    @Override
    public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context) {
        if (partitioningType != JoinPartitioningType.BROADCAST) {
            throw new NotImplementedException(partitioningType + " nested loop joins are not implemented.");
        }

        IPartitioningProperty pp;

        AbstractLogicalOperator op = (AbstractLogicalOperator) iop;

        if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
            AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(1).getValue();
            IPhysicalPropertiesVector pv1 = op2.getPhysicalOperator().getDeliveredProperties();
            if (pv1 == null) {
                pp = null;
            } else {
                pp = pv1.getPartitioningProperty();
            }
        } else {
        	pp = IPartitioningProperty.UNPARTITIONED;
        }

        List<ILocalStructuralProperty> localProps = new LinkedList<ILocalStructuralProperty>();
        this.deliveredProperties = new StructuralPropertiesVector(pp, localProps);
    }

    @Override
    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
            IPhysicalPropertiesVector reqdByParent) {
        if (partitioningType != JoinPartitioningType.BROADCAST) {
            throw new NotImplementedException(partitioningType + " nested loop joins are not implemented.");
        }

        StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
        pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(null), null);
        pv[1] = new StructuralPropertiesVector(null, null);
        return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
    }

    @Override
    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
            throws AlgebricksException {
        AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op;
        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
        IOperatorSchema[] conditionInputSchemas = new IOperatorSchema[1];
        conditionInputSchemas[0] = propagatedSchema;
        IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
        IScalarEvaluatorFactory cond = expressionRuntimeProvider.createEvaluatorFactory(join.getCondition().getValue(),
                context.getTypeEnvironment(op), conditionInputSchemas, context);
        ITuplePairComparatorFactory comparatorFactory = new TuplePairEvaluatorFactory(cond,
                context.getBinaryBooleanInspectorFactory());
        IOperatorDescriptorRegistry spec = builder.getJobSpec();
        IOperatorDescriptor opDesc = null;

        switch (kind) {
            case INNER: {
                opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize);
                break;
            }
            case LEFT_OUTER:
            default: {
                throw new NotImplementedException();
            }
        }
        contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);

        ILogicalOperator src1 = op.getInputs().get(0).getValue();
        builder.contributeGraphEdge(src1, 0, op, 0);
        ILogicalOperator src2 = op.getInputs().get(1).getValue();
        builder.contributeGraphEdge(src2, 0, op, 1);
    }

    public static class TuplePairEvaluatorFactory implements ITuplePairComparatorFactory {

        private static final long serialVersionUID = 1L;
        private final IScalarEvaluatorFactory cond;
        private final IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;

        public TuplePairEvaluatorFactory(IScalarEvaluatorFactory cond,
                IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory) {
            this.cond = cond;
            this.binaryBooleanInspectorFactory = binaryBooleanInspectorFactory;
        }

        @Override
        public synchronized ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
            return new TuplePairEvaluator(ctx, cond, binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx));
        }
    }

    public static class TuplePairEvaluator implements ITuplePairComparator {
        private final IHyracksTaskContext ctx;
        private IScalarEvaluator condEvaluator;
        private final IScalarEvaluatorFactory condFactory;
        private final IPointable p;
        private final CompositeFrameTupleReference compositeTupleRef;
        private final FrameTupleReference leftRef;
        private final FrameTupleReference rightRef;
        private final IBinaryBooleanInspector binaryBooleanInspector;

        public TuplePairEvaluator(IHyracksTaskContext ctx, IScalarEvaluatorFactory condFactory,
                IBinaryBooleanInspector binaryBooleanInspector) {
            this.ctx = ctx;
            this.condFactory = condFactory;
            this.binaryBooleanInspector = binaryBooleanInspector;
            this.leftRef = new FrameTupleReference();
            this.p = VoidPointable.FACTORY.createPointable();
            this.rightRef = new FrameTupleReference();
            this.compositeTupleRef = new CompositeFrameTupleReference(leftRef, rightRef);
        }

        @Override
        public int compare(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor,
                int innerIndex) throws HyracksDataException {
            if (condEvaluator == null) {
                try {
                    this.condEvaluator = condFactory.createScalarEvaluator(ctx);
                } catch (AlgebricksException ae) {
                    throw new HyracksDataException(ae);
                }
            }
            compositeTupleRef.reset(outerAccessor, outerIndex, innerAccessor, innerIndex);
            try {
                condEvaluator.evaluate(compositeTupleRef, p);
            } catch (AlgebricksException ae) {
                throw new HyracksDataException(ae);
            }
            boolean result = binaryBooleanInspector
                    .getBooleanValue(p.getByteArray(), p.getStartOffset(), p.getLength());
            if (result)
                return 0;
            else
                return 1;
        }
    }

    public static class CompositeFrameTupleReference implements IFrameTupleReference {

        private final FrameTupleReference refLeft;
        private final FrameTupleReference refRight;

        public CompositeFrameTupleReference(FrameTupleReference refLeft, FrameTupleReference refRight) {
            this.refLeft = refLeft;
            this.refRight = refRight;
        }

        public void reset(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor,
                int innerIndex) {
            refLeft.reset(outerAccessor, outerIndex);
            refRight.reset(innerAccessor, innerIndex);
        }

        @Override
        public int getFieldCount() {
            return refLeft.getFieldCount() + refRight.getFieldCount();
        }

        @Override
        public byte[] getFieldData(int fIdx) {
            int leftFieldCount = refLeft.getFieldCount();
            if (fIdx < leftFieldCount)
                return refLeft.getFieldData(fIdx);
            else
                return refRight.getFieldData(fIdx - leftFieldCount);
        }

        @Override
        public int getFieldStart(int fIdx) {
            int leftFieldCount = refLeft.getFieldCount();
            if (fIdx < leftFieldCount)
                return refLeft.getFieldStart(fIdx);
            else
                return refRight.getFieldStart(fIdx - leftFieldCount);
        }

        @Override
        public int getFieldLength(int fIdx) {
            int leftFieldCount = refLeft.getFieldCount();
            if (fIdx < leftFieldCount)
                return refLeft.getFieldLength(fIdx);
            else
                return refRight.getFieldLength(fIdx - leftFieldCount);
        }

        @Override
        public IFrameTupleAccessor getFrameTupleAccessor() {
            throw new NotImplementedException();
        }

        @Override
        public int getTupleIndex() {
            throw new NotImplementedException();
        }

    }
}