Merged -r 525:527 from hyracks_dev_next into branch

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_indexes@528 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/AvgAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/AvgAggregatorDescriptorFactory.java
index 123fedf..c4d623c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/AvgAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/AvgAggregatorDescriptorFactory.java
@@ -24,16 +24,8 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
-/**
- * @author jarodwen
- */
 public class AvgAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
-
-    /**
-     * 
-     */
     private static final long serialVersionUID = 1L;
-
     private final int avgField;
     private int outField = -1;
 
@@ -46,9 +38,6 @@
         this.outField = outField;
     }
 
-    /* (non-Javadoc)
-     * @see edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksStageletContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[])
-     */
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int[] keyFields) throws HyracksDataException {
@@ -120,7 +109,6 @@
 
             @Override
             public void close() {
-                // TODO Auto-generated method stub
 
             }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java
index ef6a139..90edf4a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java
@@ -28,15 +28,9 @@
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 
-/**
- * @author jarodwen
- */
 public class ConcatAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
-
     private static final long serialVersionUID = 1L;
-
     private static final int INIT_ACCUMULATORS_SIZE = 8;
-
     private final int concatField;
     private int outField = -1;
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java
index 3a42fbc..c5f0a42 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java
@@ -24,13 +24,8 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
-/**
- * @author jarodwen
- */
 public class CountAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
-
     private static final long serialVersionUID = 1L;
-
     private int outField = -1;
 
     public CountAggregatorDescriptorFactory() {
@@ -40,9 +35,6 @@
         this.outField = outField;
     }
 
-    /* (non-Javadoc)
-     * @see edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksStageletContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[])
-     */
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, final int[] keyFields) throws HyracksDataException {
@@ -103,7 +95,6 @@
 
             @Override
             public void reset() {
-                // TODO Auto-generated method stub
 
             }
         };
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java
index 85a182f..e07b123 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java
@@ -24,12 +24,8 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
-/**
- * @author jarodwen
- */
 public class IntSumAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
     private static final long serialVersionUID = 1L;
-
     private final int aggField;
     private int outField = -1;
 
@@ -115,8 +111,7 @@
 
             @Override
             public void reset() {
-                // TODO Auto-generated method stub
-
+                
             }
         };
     }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java
index d30b2b2..58d0da7 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java
@@ -20,22 +20,14 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
-/**
- * @author jarodwen
- */
 public class MultiAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
-
     private static final long serialVersionUID = 1L;
-
     private final IAggregatorDescriptorFactory[] aggregatorFactories;
 
     public MultiAggregatorDescriptorFactory(IAggregatorDescriptorFactory[] aggregatorFactories) {
         this.aggregatorFactories = aggregatorFactories;
     }
 
-    /* (non-Javadoc)
-     * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksStageletContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[])
-     */
     @Override
     public IAggregatorDescriptor createAggregator(final IHyracksStageletContext ctx,
             final RecordDescriptor inRecordDescriptor, final RecordDescriptor outRecordDescriptor, final int[] keyFields)
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index cdb0f4d..65f7f08 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -52,7 +52,6 @@
 import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
 
 public class ExternalGroupOperatorDescriptor extends AbstractOperatorDescriptor {
-
     private static final long serialVersionUID = 1L;
     /**
      * The input frame identifier (in the job environment)
@@ -78,8 +77,10 @@
         super(spec, 1, 1);
         this.framesLimit = framesLimit;
         if (framesLimit <= 1) {
-            // Minimum of 2 frames: 1 for input records, and 1 for output
-            // aggregation results.
+            /**
+             * Minimum of 2 frames: 1 for input records, and 1 for output
+             * aggregation results.
+             */
             throw new IllegalStateException("frame limit should at least be 2, but it is " + framesLimit + "!");
         }
 
@@ -91,9 +92,11 @@
         this.spillableTableFactory = spillableTableFactory;
         this.isOutputSorted = isOutputSorted;
 
-        // Set the record descriptor. Note that since
-        // this operator is a unary operator,
-        // only the first record descriptor is used here.
+        /**
+         * Set the record descriptor. Note that since
+         * this operator is a unary operator,
+         * only the first record descriptor is used here.
+         */
         recordDescriptors[0] = recordDescriptor;
     }
 
@@ -148,8 +151,10 @@
                     accessor.reset(buffer);
                     int tupleCount = accessor.getTupleCount();
                     for (int i = 0; i < tupleCount; i++) {
-                        // If the group table is too large, flush the table into
-                        // a run file.
+                        /**
+                         * If the group table is too large, flush the table into
+                         * a run file.
+                         */
                         if (!gTable.insert(accessor, i)) {
                             flushFramesToRun();
                             if (!gTable.insert(accessor, i))
@@ -168,10 +173,14 @@
                 public void close() throws HyracksDataException {
                     if (gTable.getFrameCount() >= 0) {
                         if (runs.size() <= 0) {
-                            // All in memory
+                            /**
+                             * All in memory
+                             */
                             env.set(GROUPTABLES, gTable);
                         } else {
-                            // flush the memory into the run file.
+                            /**
+                             * flush the memory into the run file.
+                             */
                             flushFramesToRun();
                             gTable.close();
                         }
@@ -225,11 +234,15 @@
             final IAggregatorDescriptor currentWorkingAggregator = mergeFactory.createAggregator(ctx,
                     recordDescriptors[0], recordDescriptors[0], keyFields);
             final int[] storedKeys = new int[keyFields.length];
-            // Get the list of the fields in the stored records.
+            /**
+             * Get the list of the fields in the stored records.
+             */
             for (int i = 0; i < keyFields.length; ++i) {
                 storedKeys[i] = i;
             }
-            // Tuple builder
+            /**
+             * Tuple builder
+             */
             final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
 
             IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
@@ -320,19 +333,20 @@
                     try {
                         currentFrameIndexInRun = new int[runNumber];
                         currentRunFrames = new int[runNumber];
-                        // Create file readers for each input run file, only
-                        // for the ones fit into the inFrames
+                        /**
+                         * Create file readers for each input run file, only
+                         * for the ones fit into the inFrames
+                         */
                         RunFileReader[] runFileReaders = new RunFileReader[runNumber];
-                        // Create input frame accessor
                         FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
-                        // Build a priority queue for extracting tuples in order
                         Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
-
                         ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(),
                                 recordDescriptors[0], runNumber, comparator);
-                        // Maintain a list of visiting index for all runs'
-                        // current frame
+                        /**
+                         * current tuple index in each run
+                         */
                         int[] tupleIndices = new int[runNumber];
+
                         for (int runIndex = runNumber - 1; runIndex >= 0; runIndex--) {
                             tupleIndices[runIndex] = 0;
                             // Load the run file
@@ -357,9 +371,13 @@
                             }
                         }
 
-                        // Start merging
+                        /**
+                         * Start merging
+                         */
                         while (!topTuples.areRunsExhausted()) {
-                            // Get the top record
+                            /**
+                             * Get the top record
+                             */
                             ReferenceEntry top = topTuples.peek();
                             int tupleIndex = top.getTupleIndex();
                             int runIndex = topTuples.peek().getRunid();
@@ -368,8 +386,10 @@
                             int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
                             if (currentTupleInOutFrame < 0
                                     || compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
-                                // Initialize the first output record
-                                // Reset the tuple builder
+                                /**
+                                 * Initialize the first output record
+                                 * Reset the tuple builder
+                                 */
                                 tupleBuilder.reset();
                                 for (int i = 0; i < keyFields.length; i++) {
                                     tupleBuilder.addField(fta, tupleIndex, i);
@@ -378,9 +398,6 @@
                                 currentWorkingAggregator.init(fta, tupleIndex, tupleBuilder);
                                 if (!outFrameAppender.append(tupleBuilder.getFieldEndOffsets(),
                                         tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
-                                    // Make sure that when the outFrame is being
-                                    // flushed, all results in it are in
-                                    // the correct state
                                     flushOutFrame(writer, finalPass);
                                     if (!outFrameAppender.append(tupleBuilder.getFieldEndOffsets(),
                                             tupleBuilder.getByteArray(), 0, tupleBuilder.getSize()))
@@ -388,9 +405,11 @@
                                                 "Failed to append an aggregation result to the output frame.");
                                 }
                             } else {
-                                // if new tuple is in the same group of the
-                                // current aggregator
-                                // do merge and output to the outFrame
+                                /**
+                                 * if new tuple is in the same group of the
+                                 * current aggregator
+                                 * do merge and output to the outFrame
+                                 */
                                 int tupleOffset = outFrameAccessor.getTupleStartOffset(currentTupleInOutFrame);
                                 int fieldOffset = outFrameAccessor.getFieldStartOffset(currentTupleInOutFrame,
                                         keyFields.length);
@@ -403,15 +422,17 @@
                             tupleIndices[runIndex]++;
                             setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
                         }
-                        // Flush the outFrame
+
                         if (outFrameAppender.getTupleCount() > 0) {
                             flushOutFrame(writer, finalPass);
                         }
-                        // After processing all records, flush the aggregator
+
                         currentWorkingAggregator.close();
                         runs.subList(0, runNumber).clear();
-                        // insert the new run file into the beginning of the run
-                        // file list
+                        /**
+                         * insert the new run file into the beginning of the run
+                         * file list
+                         */
                         if (!finalPass) {
                             runs.add(0, ((RunFileWriter) writer).createReader());
                         }
@@ -467,10 +488,14 @@
                     int runStart = runIndex * runFrameLimit;
                     boolean existNext = false;
                     if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null || runCursors[runIndex] == null) {
-                        // run already closed
+                        /**
+                         * run already closed
+                         */
                         existNext = false;
                     } else if (currentFrameIndexInRun[runIndex] - runStart < currentRunFrames[runIndex] - 1) {
-                        // not the last frame for this run
+                        /**
+                         * not the last frame for this run
+                         */
                         existNext = true;
                         if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
                             tupleIndices[runIndex] = 0;
@@ -478,7 +503,9 @@
                         }
                     } else if (tupleIndices[runIndex] < tupleAccessors[currentFrameIndexInRun[runIndex]]
                             .getTupleCount()) {
-                        // the last frame has expired
+                        /**
+                         * the last frame has expired
+                         */
                         existNext = true;
                     } else {
                         /**
@@ -507,8 +534,7 @@
                             }
                         }
                     }
-                    // Check whether the run file for the given runIndex has
-                    // more tuples
+
                     if (existNext) {
                         topTuples.popAndReplace(tupleAccessors[currentFrameIndexInRun[runIndex]],
                                 tupleIndices[runIndex]);
@@ -539,10 +565,6 @@
                     byte[] b1 = fta1.getBuffer().array();
                     byte[] b2 = fta2.getBuffer().array();
                     for (int f = 0; f < keyFields.length; ++f) {
-                        // Note: Since the comparison is only used in the merge
-                        // phase,
-                        // all the keys are clustered at the beginning of the
-                        // tuple.
                         int fIdx = f;
                         int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
                                 + fta1.getFieldStartOffset(j1, fIdx);
@@ -575,10 +597,6 @@
                     byte[] b1 = fta1.getBuffer().array();
                     byte[] b2 = fta2.getBuffer().array();
                     for (int f = 0; f < keyFields.length; ++f) {
-                        // Note: Since the comparison is only used in the merge
-                        // phase,
-                        // all the keys are clustered at the beginning of the
-                        // tuple.
                         int fIdx = f;
                         int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
                                 + fta1.getFieldStartOffset(j1, fIdx);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
index a8b61fc..3cc2fad 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
@@ -42,7 +42,6 @@
 import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
 
 public class HashSpillableGroupingTableFactory implements ISpillableTableFactory {
-
     private static final long serialVersionUID = 1L;
     private final ITuplePartitionComputerFactory tpcf;
     private final int tableSize;
@@ -152,8 +151,10 @@
                 } while (true);
 
                 if (!foundGroup) {
-                    // If no matching group is found, create a new aggregator
-                    // Create a tuple for the new group
+                    /**
+                     * If no matching group is found, create a new aggregator
+                     * Create a tuple for the new group
+                     */
                     internalTupleBuilder.reset();
                     for (int i = 0; i < keyFields.length; i++) {
                         internalTupleBuilder.addField(accessor, tIndex, keyFields[i]);
@@ -334,7 +335,9 @@
                         offset++;
                     } while (true);
                 }
-                // Sort using quick sort
+                /**
+                 * Sort using quick sort
+                 */
                 if (tPointers.length > 0) {
                     sort(tPointers, 0, totalTCount);
                 }
@@ -342,11 +345,10 @@
 
             private void sort(int[] tPointers, int offset, int length) {
                 int m = offset + (length >> 1);
-                // Get table index
                 int mTable = tPointers[m * 3];
                 int mRow = tPointers[m * 3 + 1];
                 int mNormKey = tPointers[m * 3 + 2];
-                // Get frame and tuple index
+
                 table.getTuplePointer(mTable, mRow, storedTuplePointer);
                 int mFrame = storedTuplePointer.frameIndex;
                 int mTuple = storedTuplePointer.tupleIndex;