/*
 * Copyright 2009-2010 by The Regents of the University of California
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * you may obtain a copy of the License from
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package edu.uci.ics.hyracks.dataflow.std.group;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.logging.Logger;

import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;

/**
 * This is an implementation of the external hash group operator.
 * 
 * The motivation of this operator is that when tuples are processed in
 * parallel, distinguished aggregating keys partitioned on one node may exceed
 * the main memory, so aggregation results should be output onto the disk to
 * make space for aggregating more input tuples.
 * 
 * 
 */
public class ExternalHashGroupOperatorDescriptor extends
		AbstractOperatorDescriptor {

	/**
	 * The input frame identifier (in the job environment)
	 */
	private static final String GROUPTABLES = "gtables";

	/**
	 * The runs files identifier (in the job environment)
	 */
	private static final String RUNS = "runs";

	/**
	 * The fields used for grouping (grouping keys).
	 */
	private final int[] keyFields;

	/**
	 * The comparator for checking the grouping conditions, corresponding to the
	 * {@link #keyFields}.
	 */
	private final IBinaryComparatorFactory[] comparatorFactories;

	/**
	 * The aggregator factory for the aggregating field, corresponding to the
	 * {@link #aggregateFields}.
	 */
	private IAccumulatingAggregatorFactory aggregatorFactory;

	/**
	 * The maximum number of frames in the main memory.
	 */
	private final int framesLimit;

	/**
	 * Indicate whether the final output will be sorted or not.
	 */
	private final boolean sortOutput;

	/**
	 * Partition computer factory
	 */
	private final ITuplePartitionComputerFactory tpcf;

	/**
	 * The size of the in-memory table, which should be specified now by the
	 * creator of this operator descriptor.
	 */
	private final int tableSize;

	/**
	 * XXX Logger for debug information
	 */
	private static Logger LOGGER = Logger
			.getLogger(ExternalHashGroupOperatorDescriptor.class.getName());

	/**
	 * Constructor of the external hash group operator descriptor.
	 * 
	 * @param spec
	 * @param keyFields
	 *            The fields as keys of grouping.
	 * @param framesLimit
	 *            The maximum number of frames to be used in memory.
	 * @param sortOutput
	 *            Whether the output should be sorted or not. Note that if the
	 *            input data is large enough for external grouping, the output
	 *            will be sorted surely. The only case that when the output is
	 *            not sorted is when the size of the input data can be grouped
	 *            in memory and this parameter is false.
	 * @param tpcf
	 *            The partitioner.
	 * @param comparatorFactories
	 *            The comparators.
	 * @param aggregatorFactory
	 *            The aggregators.
	 * @param recordDescriptor
	 *            The record descriptor for the input data.
	 * @param tableSize
	 *            The maximum size of the in memory table usable to this
	 *            operator.
	 */
	public ExternalHashGroupOperatorDescriptor(JobSpecification spec,
			int[] keyFields, int framesLimit, boolean sortOutput,
			ITuplePartitionComputerFactory tpcf,
			IBinaryComparatorFactory[] comparatorFactories,
			IAccumulatingAggregatorFactory aggregatorFactory,
			RecordDescriptor recordDescriptor, int tableSize) {
		super(spec, 1, 1);
		this.framesLimit = framesLimit;
		if (framesLimit <= 1) {
			// Minimum of 2 frames: 1 for input records, and 1 for output
			// aggregation results.
			throw new IllegalStateException();
		}
		this.aggregatorFactory = aggregatorFactory;
		this.keyFields = keyFields;
		this.comparatorFactories = comparatorFactories;

		this.sortOutput = sortOutput;

		this.tpcf = tpcf;

		this.tableSize = tableSize;

		// Set the record descriptor. Note that since this operator is a unary
		// operator,
		// only the first record descritpor is used here.
		recordDescriptors[0] = recordDescriptor;
	}

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	/*
	 * (non-Javadoc)
	 * 
	 * @see
	 * edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeTaskGraph
	 * (edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder)
	 */
	@Override
	public void contributeTaskGraph(IActivityGraphBuilder builder) {
		PartialAggregateActivity partialAggAct = new PartialAggregateActivity();
		MergeActivity mergeAct = new MergeActivity();

		builder.addTask(partialAggAct);
		builder.addSourceEdge(0, partialAggAct, 0);

		builder.addTask(mergeAct);
		builder.addTargetEdge(0, mergeAct, 0);

		// FIXME Block or not?
		builder.addBlockingEdge(partialAggAct, mergeAct);

	}

	private class PartialAggregateActivity extends AbstractActivityNode {

		/**
		 * 
		 */
		private static final long serialVersionUID = 1L;

		@Override
		public IOperatorNodePushable createPushRuntime(
				final IHyracksStageletContext ctx,
				final IOperatorEnvironment env,
				final IRecordDescriptorProvider recordDescProvider,
				int partition, int nPartitions) {
			// Create the in-memory hash table
			final SpillableGroupingHashTable gTable = new SpillableGroupingHashTable(
					ctx, keyFields, comparatorFactories, tpcf,
					aggregatorFactory,
					recordDescProvider.getInputRecordDescriptor(
							getOperatorId(), 0), recordDescriptors[0],
					// Always take one frame for the input records
					framesLimit - 1, tableSize);
			// Create the tuple accessor
			final FrameTupleAccessor accessor = new FrameTupleAccessor(
					ctx.getFrameSize(),
					recordDescProvider.getInputRecordDescriptor(
							getOperatorId(), 0));
			// Create the partial aggregate activity node
			IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {

				/**
				 * Run files
				 */
				private LinkedList<RunFileReader> runs;

				@Override
				public void close() throws HyracksDataException {
					if (gTable.getFrameCount() >= 0) {
						if (runs.size() <= 0) {
							// All in memory
							env.set(GROUPTABLES, gTable);
						} else {
							// flush the memory into the run file.
							flushFramesToRun();
						}
					}
					env.set(RUNS, runs);
				}

				@Override
				public void flush() throws HyracksDataException {

				}

				/**
				 * Process the next input buffer.
				 * 
				 * The actual insertion is processed in {@link #gTable}. It will
				 * check whether it is possible to contain the data into the
				 * main memory or not. If not, it will indicate the operator to
				 * flush the content of the table into a run file.
				 */
				@Override
				public void nextFrame(ByteBuffer buffer)
						throws HyracksDataException {
					accessor.reset(buffer);
					int tupleCount = accessor.getTupleCount();
					for (int i = 0; i < tupleCount; i++) {
						// If the group table is too large, flush the table into
						// a run file.
						if (!gTable.insert(accessor, i)) {
							flushFramesToRun();
							if (!gTable.insert(accessor, i))
								throw new HyracksDataException(
										"Failed to insert a new buffer into the aggregate operator!");
						}
					}

				}

				@Override
				public void open() throws HyracksDataException {
					runs = new LinkedList<RunFileReader>();
					gTable.reset();
				}

				/**
				 * Flush the content of the group table into a run file.
				 * 
				 * During the flushing, the hash table will be sorted as first.
				 * After that, a run file handler is initialized and the hash
				 * table is flushed into the run file.
				 * 
				 * @throws HyracksDataException
				 */
				private void flushFramesToRun() throws HyracksDataException {
					// Sort the contents of the hash table.
					gTable.sortFrames();
					FileReference runFile;
					try {
						runFile = ctx.getJobletContext().createWorkspaceFile(
								ExternalHashGroupOperatorDescriptor.class
										.getSimpleName());
					} catch (IOException e) {
						throw new HyracksDataException(e);
					}
					RunFileWriter writer = new RunFileWriter(runFile,
							ctx.getIOManager());
					writer.open();
					try {
						gTable.flushFrames(writer, true);
					} catch (Exception ex) {
						throw new HyracksDataException(ex);
					} finally {
						writer.close();
					}
					gTable.reset();
					runs.add(((RunFileWriter) writer).createReader());
					LOGGER.warning("Created run file: "
							+ runFile.getFile().getAbsolutePath());
				}

			};

			return op;
		}

		@Override
		public IOperatorDescriptor getOwner() {
			return ExternalHashGroupOperatorDescriptor.this;
		}

	}

	private class MergeActivity extends AbstractActivityNode {

		/**
		 * 
		 */
		private static final long serialVersionUID = 1L;

		@Override
		public IOperatorNodePushable createPushRuntime(
				final IHyracksStageletContext ctx,
				final IOperatorEnvironment env,
				IRecordDescriptorProvider recordDescProvider, int partition,
				int nPartitions) {
			final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
			for (int i = 0; i < comparatorFactories.length; ++i) {
				comparators[i] = comparatorFactories[i]
						.createBinaryComparator();
			}
			IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
				/**
				 * Input frames, one for each run file.
				 */
				private List<ByteBuffer> inFrames;

				/**
				 * Output frame.
				 */
				private ByteBuffer outFrame;

				/**
				 * List of the run files to be merged
				 */
				LinkedList<RunFileReader> runs;

				/**
				 * Tuple appender for the output frame {@link #outFrame}.
				 */
				private FrameTupleAppender outFrameAppender;

				private ISpillableAccumulatingAggregator visitingAggregator;
				private ArrayTupleBuilder visitingKeyTuple;

				@SuppressWarnings("unchecked")
				@Override
				public void initialize() throws HyracksDataException {
					runs = (LinkedList<RunFileReader>) env.get(RUNS);
					writer.open();

					try {
						if (runs.size() <= 0) {
							// If the aggregate results can be fit into
							// memory...
							SpillableGroupingHashTable gTable = (SpillableGroupingHashTable) env
									.get(GROUPTABLES);
							if (gTable != null) {
								gTable.flushFrames(writer, sortOutput);
							}
							env.set(GROUPTABLES, null);
						} else {
							// Otherwise, merge the run files into a single file
							inFrames = new ArrayList<ByteBuffer>();
							outFrame = ctx.allocateFrame();
							outFrameAppender = new FrameTupleAppender(
									ctx.getFrameSize());
							outFrameAppender.reset(outFrame, true);
							for (int i = 0; i < framesLimit - 1; ++i) {
								inFrames.add(ctx.allocateFrame());
							}
							int passCount = 0;
							while (runs.size() > 0) {
								passCount++;
								try {
									doPass(runs, passCount);
								} catch (Exception e) {
									throw new HyracksDataException(e);
								}
							}
						}

					} finally {
						writer.close();
					}
					env.set(RUNS, null);
				}

				/**
				 * Merge the run files once.
				 * 
				 * @param runs
				 * @param passCount
				 * @throws HyracksDataException
				 * @throws IOException
				 */
				private void doPass(LinkedList<RunFileReader> runs,
						int passCount) throws HyracksDataException, IOException {
					FileReference newRun = null;
					IFrameWriter writer = this.writer;
					boolean finalPass = false;

					int[] storedKeys = new int[keyFields.length];
					// Get the list of the fields in the stored records.
					for (int i = 0; i < keyFields.length; ++i) {
						storedKeys[i] = i;
					}

					// Release the space not used
					if (runs.size() + 1 <= framesLimit) {
						// If there are run files no more than the available
						// frame slots...
						// No run file to be generated, since the result can be
						// directly
						// outputted into the output frame for write.
						finalPass = true;
						for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
							inFrames.remove(i);
						}
					} else {
						// Otherwise, a new run file will be created
						newRun = ctx.getJobletContext().createWorkspaceFile(
								ExternalHashGroupOperatorDescriptor.class
										.getSimpleName());
						writer = new RunFileWriter(newRun, ctx.getIOManager());
						writer.open();
					}
					try {
						// Create run file read handler for each input frame
						RunFileReader[] runFileReaders = new RunFileReader[inFrames
								.size()];
						// Create input frame accessor
						FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames
								.size()];
						Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
						ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(
								ctx.getFrameSize(), recordDescriptors[0],
								inFrames.size(), comparator);
						// For the index of tuples visited in each frame.
						int[] tupleIndexes = new int[inFrames.size()];
						for (int i = 0; i < inFrames.size(); i++) {
							tupleIndexes[i] = 0;
							int runIndex = topTuples.peek().getRunid();
							runFileReaders[runIndex] = runs.get(runIndex);
							runFileReaders[runIndex].open();
							// Load the first frame of the file into the main
							// memory
							if (runFileReaders[runIndex].nextFrame(inFrames
									.get(runIndex))) {
								// initialize the tuple accessor for the frame
								tupleAccessors[runIndex] = new FrameTupleAccessor(
										ctx.getFrameSize(),
										recordDescriptors[0]);
								tupleAccessors[runIndex].reset(inFrames
										.get(runIndex));
								setNextTopTuple(runIndex, tupleIndexes,
										runFileReaders, tupleAccessors,
										topTuples);
							} else {
								closeRun(runIndex, runFileReaders,
										tupleAccessors);
							}
						}
						// Merge
						// Get a key holder for the current working
						// aggregator keys
						visitingAggregator = null;
						visitingKeyTuple = null;
						// Loop on all run files, and update the key
						// holder.
						while (!topTuples.areRunsExhausted()) {
							// Get the top record
							ReferenceEntry top = topTuples.peek();
							int tupleIndex = top.getTupleIndex();
							int runIndex = topTuples.peek().getRunid();
							FrameTupleAccessor fta = top.getAccessor();
							if (visitingAggregator == null) {
								// Initialize the aggregator
								visitingAggregator = aggregatorFactory
										.createSpillableAggregator(ctx,
												recordDescriptors[0],
												recordDescriptors[0]);
								// Initialize the partial aggregation result
								visitingAggregator.initFromPartial(fta,
										tupleIndex, keyFields);
								visitingKeyTuple = new ArrayTupleBuilder(
										recordDescriptors[0].getFields().length);
								for (int i = 0; i < keyFields.length; i++) {
									visitingKeyTuple.addField(fta, tupleIndex,
											keyFields[i]);
								}
							} else {
								if (compareTupleWithFrame(visitingKeyTuple,
										fta, tupleIndex, storedKeys, keyFields,
										comparators) == 0) {
									// If the two partial results are on the
									// same key
									visitingAggregator.accumulatePartialResult(
											fta, tupleIndex, keyFields);
								} else {
									// Otherwise, write the partial result back
									// to the output frame
									if (!visitingAggregator.output(
											outFrameAppender, visitingKeyTuple)) {
										FrameUtils.flushFrame(outFrame, writer);
										outFrameAppender.reset(outFrame, true);
										if (!visitingAggregator.output(
												outFrameAppender,
												visitingKeyTuple)) {
											throw new IllegalStateException();
										}
									}
									// Reset the partial aggregation result
									visitingAggregator.initFromPartial(fta,
											tupleIndex, keyFields);
									visitingKeyTuple.reset();
									for (int i = 0; i < keyFields.length; i++) {
										visitingKeyTuple.addField(fta,
												tupleIndex, keyFields[i]);
									}
								}
							}
							tupleIndexes[runIndex]++;
							setNextTopTuple(runIndex, tupleIndexes,
									runFileReaders, tupleAccessors, topTuples);
						}
						// Output the last aggregation result in the frame
						if (visitingAggregator != null) {
							if (!visitingAggregator.output(outFrameAppender,
									visitingKeyTuple)) {
								FrameUtils.flushFrame(outFrame, writer);
								outFrameAppender.reset(outFrame, true);
								if (!visitingAggregator.output(
										outFrameAppender, visitingKeyTuple)) {
									throw new IllegalStateException();
								}
							}
						}
						// Output data into run file writer after all tuples
						// have been checked
						if (outFrameAppender.getTupleCount() > 0) {
							FrameUtils.flushFrame(outFrame, writer);
							outFrameAppender.reset(outFrame, true);
						}
						// empty the input frames
						runs.subList(0, inFrames.size()).clear();
						// insert the new run file into the beginning of the run
						// file list
						if (!finalPass) {
							runs.add(0, ((RunFileWriter) writer).createReader());
						}
					} catch (Exception ex) {
						throw new HyracksDataException(ex);
					} finally {
						if (!finalPass) {
							writer.close();
						}
					}
				}

				/**
				 * Insert the tuple into the priority queue.
				 * 
				 * @param runIndex
				 * @param tupleIndexes
				 * @param runCursors
				 * @param tupleAccessors
				 * @param topTuples
				 * @throws IOException
				 */
				private void setNextTopTuple(int runIndex, int[] tupleIndexes,
						RunFileReader[] runCursors,
						FrameTupleAccessor[] tupleAccessors,
						ReferencedPriorityQueue topTuples) throws IOException {
					boolean exists = hasNextTuple(runIndex, tupleIndexes,
							runCursors, tupleAccessors);
					if (exists) {
						topTuples.popAndReplace(tupleAccessors[runIndex],
								tupleIndexes[runIndex]);
					} else {
						topTuples.pop();
						closeRun(runIndex, runCursors, tupleAccessors);
					}
				}

				/**
				 * Check whether there are any more tuples to be checked for the
				 * given run file from the corresponding input frame.
				 * 
				 * If the input frame for this run file is exhausted, load a new
				 * frame of the run file into the input frame.
				 * 
				 * @param runIndex
				 * @param tupleIndexes
				 * @param runCursors
				 * @param tupleAccessors
				 * @return
				 * @throws IOException
				 */
				private boolean hasNextTuple(int runIndex, int[] tupleIndexes,
						RunFileReader[] runCursors,
						FrameTupleAccessor[] tupleAccessors) throws IOException {

					if (tupleAccessors[runIndex] == null
							|| runCursors[runIndex] == null) {
						/*
						 * Return false if the targeting run file is not
						 * available, or the frame for the run file is not
						 * available.
						 */
						return false;
					} else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex]
							.getTupleCount()) {
						/*
						 * If all tuples in the targeting frame have been
						 * checked.
						 */
						ByteBuffer buf = tupleAccessors[runIndex].getBuffer(); // same-as-inFrames.get(runIndex)
						// Refill the buffer with contents from the run file.
						if (runCursors[runIndex].nextFrame(buf)) {
							tupleIndexes[runIndex] = 0;
							return hasNextTuple(runIndex, tupleIndexes,
									runCursors, tupleAccessors);
						} else {
							return false;
						}
					} else {
						return true;
					}
				}

				/**
				 * Close the run file, and also the corresponding readers and
				 * input frame.
				 * 
				 * @param index
				 * @param runCursors
				 * @param tupleAccessor
				 * @throws HyracksDataException
				 */
				private void closeRun(int index, RunFileReader[] runCursors,
						IFrameTupleAccessor[] tupleAccessor)
						throws HyracksDataException {
					runCursors[index].close();
					runCursors[index] = null;
					tupleAccessor[index] = null;
				}

				/**
				 * Compare a tuple (in the format of a {@link ArrayTupleBuilder}
				 * ) with a record in a frame (in the format of a
				 * {@link FrameTupleAccessor}). Comparing keys and comparators
				 * are specified for this method as inputs.
				 * 
				 * @param tuple0
				 * @param accessor1
				 * @param tIndex1
				 * @param keys0
				 * @param keys1
				 * @param comparators
				 * @return
				 */
				private int compareTupleWithFrame(ArrayTupleBuilder tuple0,
						FrameTupleAccessor accessor1, int tIndex1, int[] keys0,
						int[] keys1, IBinaryComparator[] comparators) {
					int tStart1 = accessor1.getTupleStartOffset(tIndex1);
					int fStartOffset1 = accessor1.getFieldSlotsLength()
							+ tStart1;

					for (int i = 0; i < keys0.length; ++i) {
						int fIdx0 = keys0[i];
						int fStart0 = (i == 0 ? 0
								: tuple0.getFieldEndOffsets()[fIdx0 - 1]);
						int fEnd0 = tuple0.getFieldEndOffsets()[fIdx0];
						int fLen0 = fEnd0 - fStart0;

						int fIdx1 = keys1[i];
						int fStart1 = accessor1.getFieldStartOffset(tIndex1,
								fIdx1);
						int fEnd1 = accessor1.getFieldEndOffset(tIndex1, fIdx1);
						int fLen1 = fEnd1 - fStart1;

						int c = comparators[i].compare(tuple0.getByteArray(),
								fStart0, fLen0, accessor1.getBuffer().array(),
								fStart1 + fStartOffset1, fLen1);
						if (c != 0) {
							return c;
						}
					}
					return 0;
				}
			};
			return op;
		}

		@Override
		public IOperatorDescriptor getOwner() {
			return ExternalHashGroupOperatorDescriptor.this;
		}

		private Comparator<ReferenceEntry> createEntryComparator(
				final IBinaryComparator[] comparators) {
			return new Comparator<ReferenceEntry>() {
				public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
					FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1
							.getAccessor();
					FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2
							.getAccessor();
					int j1 = (Integer) tp1.getTupleIndex();
					int j2 = (Integer) tp2.getTupleIndex();
					byte[] b1 = fta1.getBuffer().array();
					byte[] b2 = fta2.getBuffer().array();
					for (int f = 0; f < keyFields.length; ++f) {
						int fIdx = keyFields[f];
						int s1 = fta1.getTupleStartOffset(j1)
								+ fta1.getFieldSlotsLength()
								+ fta1.getFieldStartOffset(j1, fIdx);
						int l1 = fta1.getFieldEndOffset(j1, fIdx)
								- fta1.getFieldStartOffset(j1, fIdx);
						int s2 = fta2.getTupleStartOffset(j2)
								+ fta2.getFieldSlotsLength()
								+ fta2.getFieldStartOffset(j2, fIdx);
						int l2 = fta2.getFieldEndOffset(j2, fIdx)
								- fta2.getFieldStartOffset(j2, fIdx);
						int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
						if (c != 0) {
							return c;
						}
					}
					return 0;
				}
			};
		}

	}
}
