/*
 * Copyright 2009-2010 by The Regents of the University of California
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * you may obtain a copy of the License from
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package edu.uci.ics.hyracks.dataflow.std.group;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;

/**
 *
 */
public class HashSpillableTableFactory implements ISpillableTableFactory {

    private static final long serialVersionUID = 1L;
    private final ITuplePartitionComputerFactory tpcf;
    private final int tableSize;

    public HashSpillableTableFactory(ITuplePartitionComputerFactory tpcf, int tableSize) {
        this.tpcf = tpcf;
        this.tableSize = tableSize;
    }

    /*
     * (non-Javadoc)
     * 
     * @see
     * edu.uci.ics.hyracks.dataflow.std.aggregations.ISpillableTableFactory#
     * buildSpillableTable(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
     * int[], edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory[],
     * edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory,
     * edu.
     * uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory
     * [], edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
     * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int)
     */
    @Override
    public ISpillableTable buildSpillableTable(final IHyracksTaskContext ctx, final int[] keyFields,
            IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
            IAggregatorDescriptorFactory aggregateFactory, RecordDescriptor inRecordDescriptor,
            RecordDescriptor outRecordDescriptor, final int framesLimit) throws HyracksDataException {
        final int[] storedKeys = new int[keyFields.length];
        @SuppressWarnings("rawtypes")
        ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[keyFields.length];
        for (int i = 0; i < keyFields.length; i++) {
            storedKeys[i] = i;
            storedKeySerDeser[i] = inRecordDescriptor.getFields()[keyFields[i]];
        }

        RecordDescriptor internalRecordDescriptor = outRecordDescriptor;
        final FrameTupleAccessor storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(),
                internalRecordDescriptor);
        final FrameTupleAccessor storedKeysAccessor2 = new FrameTupleAccessor(ctx.getFrameSize(),
                internalRecordDescriptor);

        final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
        for (int i = 0; i < comparatorFactories.length; ++i) {
            comparators[i] = comparatorFactories[i].createBinaryComparator();
        }

        final FrameTuplePairComparator ftpcPartial = new FrameTuplePairComparator(keyFields, storedKeys, comparators);

        final FrameTuplePairComparator ftpcTuple = new FrameTuplePairComparator(storedKeys, storedKeys, comparators);

        final ITuplePartitionComputer tpc = tpcf.createPartitioner();

        final INormalizedKeyComputer nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory
                .createNormalizedKeyComputer();

        int[] keyFieldsInPartialResults = new int[keyFields.length];
        for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
            keyFieldsInPartialResults[i] = i;
        }

        final IAggregatorDescriptor aggregator = aggregateFactory.createAggregator(ctx, inRecordDescriptor,
                outRecordDescriptor, keyFields, keyFieldsInPartialResults);

        final AggregateState aggregateState = aggregator.createAggregateStates();

        final ArrayTupleBuilder stateTupleBuilder;
        if (keyFields.length < outRecordDescriptor.getFields().length) {
            stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
        } else {
            stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length + 1);
        }

        final ArrayTupleBuilder outputTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);

        return new ISpillableTable() {

            private int lastBufIndex;

            private ByteBuffer outputFrame;
            private FrameTupleAppender outputAppender;

            private FrameTupleAppender stateAppender = new FrameTupleAppender(ctx.getFrameSize());

            private final ISerializableTable table = new SerializableHashTable(tableSize, ctx);
            private final TuplePointer storedTuplePointer = new TuplePointer();
            private final List<ByteBuffer> frames = new ArrayList<ByteBuffer>();

            /**
             * A tuple is "pointed" to by 3 entries in the tPointers array. [0]
             * = Frame index in the "Frames" list, [1] = Tuple index in the
             * frame, [2] = Poor man's normalized key for the tuple.
             */
            private int[] tPointers;

            @Override
            public void sortFrames() {
                int sfIdx = storedKeys[0];
                int totalTCount = table.getTupleCount();
                tPointers = new int[totalTCount * 3];
                int ptr = 0;

                for (int i = 0; i < tableSize; i++) {
                    int entry = i;
                    int offset = 0;
                    do {
                        table.getTuplePointer(entry, offset, storedTuplePointer);
                        if (storedTuplePointer.frameIndex < 0)
                            break;
                        tPointers[ptr * 3] = entry;
                        tPointers[ptr * 3 + 1] = offset;
                        table.getTuplePointer(entry, offset, storedTuplePointer);
                        int fIndex = storedTuplePointer.frameIndex;
                        int tIndex = storedTuplePointer.tupleIndex;
                        storedKeysAccessor1.reset(frames.get(fIndex));
                        int tStart = storedKeysAccessor1.getTupleStartOffset(tIndex);
                        int f0StartRel = storedKeysAccessor1.getFieldStartOffset(tIndex, sfIdx);
                        int f0EndRel = storedKeysAccessor1.getFieldEndOffset(tIndex, sfIdx);
                        int f0Start = f0StartRel + tStart + storedKeysAccessor1.getFieldSlotsLength();
                        tPointers[ptr * 3 + 2] = nkc == null ? 0 : nkc.normalize(storedKeysAccessor1.getBuffer()
                                .array(), f0Start, f0EndRel - f0StartRel);
                        ptr++;
                        offset++;
                    } while (true);
                }
                /**
                 * Sort using quick sort
                 */
                if (tPointers.length > 0) {
                    sort(tPointers, 0, totalTCount);
                }
            }

            @Override
            public void reset() {
                lastBufIndex = -1;
                tPointers = null;
                table.reset();
                aggregator.reset();
            }

            @Override
            public boolean insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
                if (lastBufIndex < 0)
                    nextAvailableFrame();
                int entry = tpc.partition(accessor, tIndex, tableSize);
                boolean foundGroup = false;
                int offset = 0;
                do {
                    table.getTuplePointer(entry, offset++, storedTuplePointer);
                    if (storedTuplePointer.frameIndex < 0)
                        break;
                    storedKeysAccessor1.reset(frames.get(storedTuplePointer.frameIndex));
                    int c = ftpcPartial.compare(accessor, tIndex, storedKeysAccessor1, storedTuplePointer.tupleIndex);
                    if (c == 0) {
                        foundGroup = true;
                        break;
                    }
                } while (true);

                if (!foundGroup) {

                    stateTupleBuilder.reset();

                    for (int k = 0; k < keyFields.length; k++) {
                        stateTupleBuilder.addField(accessor, tIndex, keyFields[k]);
                    }

                    aggregator.init(stateTupleBuilder, accessor, tIndex, aggregateState);
                    if (!stateAppender.appendSkipEmptyField(stateTupleBuilder.getFieldEndOffsets(),
                            stateTupleBuilder.getByteArray(), 0, stateTupleBuilder.getSize())) {
                        if (!nextAvailableFrame()) {
                            return false;
                        }
                        if (!stateAppender.appendSkipEmptyField(stateTupleBuilder.getFieldEndOffsets(),
                                stateTupleBuilder.getByteArray(), 0, stateTupleBuilder.getSize())) {
                            throw new HyracksDataException("Cannot init external aggregate state in a frame.");
                        }
                    }

                    storedTuplePointer.frameIndex = lastBufIndex;
                    storedTuplePointer.tupleIndex = stateAppender.getTupleCount() - 1;
                    table.insert(entry, storedTuplePointer);
                } else {

                    aggregator.aggregate(accessor, tIndex, storedKeysAccessor1, storedTuplePointer.tupleIndex,
                            aggregateState);

                }
                return true;
            }

            @Override
            public List<ByteBuffer> getFrames() {
                return frames;
            }

            @Override
            public int getFrameCount() {
                return lastBufIndex;
            }

            @Override
            public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException {
                if (outputFrame == null) {
                    outputFrame = ctx.allocateFrame();
                }

                if (outputAppender == null) {
                    outputAppender = new FrameTupleAppender(outputFrame.capacity());
                }

                outputAppender.reset(outputFrame, true);

                if (tPointers == null) {
                    // Not sorted
                    for (int i = 0; i < tableSize; ++i) {
                        int entry = i;
                        int offset = 0;
                        do {
                            table.getTuplePointer(entry, offset++, storedTuplePointer);
                            if (storedTuplePointer.frameIndex < 0)
                                break;
                            int bIndex = storedTuplePointer.frameIndex;
                            int tIndex = storedTuplePointer.tupleIndex;

                            storedKeysAccessor1.reset(frames.get(bIndex));

                            outputTupleBuilder.reset();
                            for (int k = 0; k < storedKeys.length; k++) {
                                outputTupleBuilder.addField(storedKeysAccessor1, tIndex, storedKeys[k]);
                            }

                            if (isPartial) {

                                aggregator.outputPartialResult(outputTupleBuilder, storedKeysAccessor1, tIndex,
                                        aggregateState);

                            } else {

                                aggregator.outputFinalResult(outputTupleBuilder, storedKeysAccessor1, tIndex,
                                        aggregateState);
                            }

                            if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
                                    outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
                                FrameUtils.flushFrame(outputFrame, writer);
                                outputAppender.reset(outputFrame, true);
                                if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
                                        outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
                                    throw new HyracksDataException(
                                            "The output item is too large to be fit into a frame.");
                                }
                            }

                        } while (true);
                    }
                    if (outputAppender.getTupleCount() > 0) {
                        FrameUtils.flushFrame(outputFrame, writer);
                        outputAppender.reset(outputFrame, true);
                    }
                    aggregator.close();
                    return;
                }
                int n = tPointers.length / 3;
                for (int ptr = 0; ptr < n; ptr++) {
                    int tableIndex = tPointers[ptr * 3];
                    int rowIndex = tPointers[ptr * 3 + 1];
                    table.getTuplePointer(tableIndex, rowIndex, storedTuplePointer);
                    int frameIndex = storedTuplePointer.frameIndex;
                    int tupleIndex = storedTuplePointer.tupleIndex;
                    // Get the frame containing the value
                    ByteBuffer buffer = frames.get(frameIndex);
                    storedKeysAccessor1.reset(buffer);

                    outputTupleBuilder.reset();
                    for (int k = 0; k < storedKeys.length; k++) {
                        outputTupleBuilder.addField(storedKeysAccessor1, tupleIndex, storedKeys[k]);
                    }

                    if (isPartial) {

                        aggregator.outputPartialResult(outputTupleBuilder, storedKeysAccessor1, tupleIndex,
                                aggregateState);

                    } else {

                        aggregator.outputFinalResult(outputTupleBuilder, storedKeysAccessor1, tupleIndex,
                                aggregateState);
                    }

                    if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
                            outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
                        FrameUtils.flushFrame(outputFrame, writer);
                        outputAppender.reset(outputFrame, true);
                        if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
                                outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
                            throw new HyracksDataException("The output item is too large to be fit into a frame.");
                        }
                    }
                }
                if (outputAppender.getTupleCount() > 0) {
                    FrameUtils.flushFrame(outputFrame, writer);
                    outputAppender.reset(outputFrame, true);
                }
                aggregator.close();
            }

            @Override
            public void close() {
                lastBufIndex = -1;
                tPointers = null;
                table.close();
                frames.clear();
                aggregateState.close();
            }

            /**
             * Set the working frame to the next available frame in the frame
             * list. There are two cases:<br>
             * 1) If the next frame is not initialized, allocate a new frame. 2)
             * When frames are already created, they are recycled.
             * 
             * @return Whether a new frame is added successfully.
             */
            private boolean nextAvailableFrame() {
                // Return false if the number of frames is equal to the limit.
                if (lastBufIndex + 1 >= framesLimit)
                    return false;

                if (frames.size() < framesLimit) {
                    // Insert a new frame
                    ByteBuffer frame = ctx.allocateFrame();
                    frame.position(0);
                    frame.limit(frame.capacity());
                    frames.add(frame);
                    stateAppender.reset(frame, true);
                    lastBufIndex = frames.size() - 1;
                } else {
                    // Reuse an old frame
                    lastBufIndex++;
                    ByteBuffer frame = frames.get(lastBufIndex);
                    frame.position(0);
                    frame.limit(frame.capacity());
                    stateAppender.reset(frame, true);
                }
                return true;
            }

            private void sort(int[] tPointers, int offset, int length) {
                int m = offset + (length >> 1);
                int mTable = tPointers[m * 3];
                int mRow = tPointers[m * 3 + 1];
                int mNormKey = tPointers[m * 3 + 2];

                table.getTuplePointer(mTable, mRow, storedTuplePointer);
                int mFrame = storedTuplePointer.frameIndex;
                int mTuple = storedTuplePointer.tupleIndex;
                storedKeysAccessor1.reset(frames.get(mFrame));

                int a = offset;
                int b = a;
                int c = offset + length - 1;
                int d = c;
                while (true) {
                    while (b <= c) {
                        int bTable = tPointers[b * 3];
                        int bRow = tPointers[b * 3 + 1];
                        int bNormKey = tPointers[b * 3 + 2];
                        int cmp = 0;
                        if (bNormKey != mNormKey) {
                            cmp = ((((long) bNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
                        } else {
                            table.getTuplePointer(bTable, bRow, storedTuplePointer);
                            int bFrame = storedTuplePointer.frameIndex;
                            int bTuple = storedTuplePointer.tupleIndex;
                            storedKeysAccessor2.reset(frames.get(bFrame));
                            cmp = ftpcTuple.compare(storedKeysAccessor2, bTuple, storedKeysAccessor1, mTuple);
                        }
                        if (cmp > 0) {
                            break;
                        }
                        if (cmp == 0) {
                            swap(tPointers, a++, b);
                        }
                        ++b;
                    }
                    while (c >= b) {
                        int cTable = tPointers[c * 3];
                        int cRow = tPointers[c * 3 + 1];
                        int cNormKey = tPointers[c * 3 + 2];
                        int cmp = 0;
                        if (cNormKey != mNormKey) {
                            cmp = ((((long) cNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
                        } else {
                            table.getTuplePointer(cTable, cRow, storedTuplePointer);
                            int cFrame = storedTuplePointer.frameIndex;
                            int cTuple = storedTuplePointer.tupleIndex;
                            storedKeysAccessor2.reset(frames.get(cFrame));
                            cmp = ftpcTuple.compare(storedKeysAccessor2, cTuple, storedKeysAccessor1, mTuple);
                        }
                        if (cmp < 0) {
                            break;
                        }
                        if (cmp == 0) {
                            swap(tPointers, c, d--);
                        }
                        --c;
                    }
                    if (b > c)
                        break;
                    swap(tPointers, b++, c--);
                }

                int s;
                int n = offset + length;
                s = Math.min(a - offset, b - a);
                vecswap(tPointers, offset, b - s, s);
                s = Math.min(d - c, n - d - 1);
                vecswap(tPointers, b, n - s, s);

                if ((s = b - a) > 1) {
                    sort(tPointers, offset, s);
                }
                if ((s = d - c) > 1) {
                    sort(tPointers, n - s, s);
                }
            }

            private void swap(int x[], int a, int b) {
                for (int i = 0; i < 3; ++i) {
                    int t = x[a * 3 + i];
                    x[a * 3 + i] = x[b * 3 + i];
                    x[b * 3 + i] = t;
                }
            }

            private void vecswap(int x[], int a, int b, int n) {
                for (int i = 0; i < n; i++, a++, b++) {
                    swap(x, a, b);
                }
            }

        };
    }

}
