Finished implementing performance-optimized MultiComparators.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_multicomparator_opt@2506 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
index 475f1c1..0d8be60 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
@@ -992,7 +992,7 @@
         protected void verifyInputTuple(ITupleReference tuple, ITupleReference prevTuple) throws IndexException,
                 HyracksDataException {
             // New tuple should be strictly greater than last tuple.
-            if (cmp.compare(tuple, prevTuple) != 1) {
+            if (cmp.compare(tuple, prevTuple) <= 0) {
                 throw new UnsortedInputException("Input stream given to BTree bulk load is not sorted.");
             }
         }
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/FieldLengthIgnoringMultiComparator.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/FieldLengthIgnoringMultiComparator.java
new file mode 100644
index 0000000..4e43a34
--- /dev/null
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/FieldLengthIgnoringMultiComparator.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.common.ophelpers;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * MultiComparator that always passes 0 as a tuple's field length. This may speed up comparisons.
+ */
+public class FieldLengthIgnoringMultiComparator extends MultiComparator {
+
+    public FieldLengthIgnoringMultiComparator(IBinaryComparator[] cmps) {
+        super(cmps);
+    }
+
+    @Override
+    public int compare(ITupleReference tupleA, ITupleReference tupleB) {
+        for (int i = 0; i < cmps.length; i++) {
+            int cmp = cmps[i].compare(tupleA.getFieldData(i), tupleA.getFieldStart(i), 0, tupleB.getFieldData(i),
+                    tupleB.getFieldStart(i), 0);
+            if (cmp != 0) {
+                return cmp;
+            }
+        }
+        return 0;
+    }
+
+    @Override
+    public int selectiveFieldCompare(ITupleReference tupleA, ITupleReference tupleB, int[] fields) {
+        for (int j = 0; j < cmps.length; j++) {
+            int i = fields[j];
+            int cmp = cmps[j].compare(tupleA.getFieldData(i), tupleA.getFieldStart(i), 0, tupleB.getFieldData(i),
+                    tupleB.getFieldStart(i), 0);
+            if (cmp != 0) {
+                return cmp;
+            }
+        }
+        return 0;
+    }
+
+    @Override
+    public int compare(ITupleReference tupleA, ITupleReference tupleB, int startFieldIndex) {
+        for (int i = 0; i < cmps.length; i++) {
+            int ix = startFieldIndex + i;
+            int cmp = cmps[i].compare(tupleA.getFieldData(ix), tupleA.getFieldStart(ix), 0, tupleB.getFieldData(ix),
+                    tupleB.getFieldStart(ix), 0);
+            if (cmp != 0) {
+                return cmp;
+            }
+        }
+        return 0;
+    }
+
+    @Override
+    public int fieldRangeCompare(ITupleReference tupleA, ITupleReference tupleB, int startFieldIndex, int numFields) {
+        for (int i = startFieldIndex; i < startFieldIndex + numFields; i++) {
+            int cmp = cmps[i].compare(tupleA.getFieldData(i), tupleA.getFieldStart(i), 0, tupleB.getFieldData(i),
+                    tupleB.getFieldStart(i), 0);
+            if (cmp != 0) {
+                return cmp;
+            }
+        }
+        return 0;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/FieldLengthIgnoringSingleComparator.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/FieldLengthIgnoringSingleComparator.java
new file mode 100644
index 0000000..d35297a
--- /dev/null
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/FieldLengthIgnoringSingleComparator.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.common.ophelpers;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * MultiComparator optimized for the special case where there is only a single comparator.
+ * Further speeds up comparisons by always passing 0 as the field's length.
+ */
+public class FieldLengthIgnoringSingleComparator extends MultiComparator {
+    private final IBinaryComparator cmp;
+
+    protected FieldLengthIgnoringSingleComparator(IBinaryComparator[] cmps) {
+        super(cmps);
+        this.cmp = cmps[0];
+    }
+
+    public int compare(ITupleReference tupleA, ITupleReference tupleB) {
+        return cmp.compare(tupleA.getFieldData(0), tupleA.getFieldStart(0), 0, tupleB.getFieldData(0),
+                tupleB.getFieldStart(0), 0);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java
index 280bb41..a52ca3f 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java
@@ -21,7 +21,7 @@
 
 public class MultiComparator {
 
-    private final IBinaryComparator[] cmps;
+    protected final IBinaryComparator[] cmps;
 
     public MultiComparator(IBinaryComparator[] cmps) {
         this.cmps = cmps;
@@ -31,10 +31,9 @@
         for (int i = 0; i < cmps.length; i++) {
             int cmp = cmps[i].compare(tupleA.getFieldData(i), tupleA.getFieldStart(i), tupleA.getFieldLength(i),
                     tupleB.getFieldData(i), tupleB.getFieldStart(i), tupleB.getFieldLength(i));
-            if (cmp < 0)
-                return -1;
-            else if (cmp > 0)
-                return 1;
+            if (cmp != 0) {
+                return cmp;
+            }
         }
         return 0;
     }
@@ -44,38 +43,32 @@
             int i = fields[j];
             int cmp = cmps[j].compare(tupleA.getFieldData(i), tupleA.getFieldStart(i), tupleA.getFieldLength(i),
                     tupleB.getFieldData(i), tupleB.getFieldStart(i), tupleB.getFieldLength(i));
-            if (cmp < 0)
-                return -1;
-            else if (cmp > 0)
-                return 1;
+            if (cmp != 0) {
+                return cmp;
+            }
         }
         return 0;
     }
-	
-	public int compare(ITupleReference tupleA,
-			ITupleReference tupleB, int startFieldIndex) {
-		for (int i = 0; i < cmps.length; i++) {
-			int ix = startFieldIndex + i;
-			int cmp = cmps[i].compare(tupleA.getFieldData(ix),
-					tupleA.getFieldStart(ix), tupleA.getFieldLength(ix),
-					tupleB.getFieldData(ix), tupleB.getFieldStart(ix),
-					tupleB.getFieldLength(ix));
-			if (cmp < 0)
-				return -1;
-			else if (cmp > 0)
-				return 1;
-		}
-		return 0;
-	}
+
+    public int compare(ITupleReference tupleA, ITupleReference tupleB, int startFieldIndex) {
+        for (int i = 0; i < cmps.length; i++) {
+            int ix = startFieldIndex + i;
+            int cmp = cmps[i].compare(tupleA.getFieldData(ix), tupleA.getFieldStart(ix), tupleA.getFieldLength(ix),
+                    tupleB.getFieldData(ix), tupleB.getFieldStart(ix), tupleB.getFieldLength(ix));
+            if (cmp != 0) {
+                return cmp;
+            }
+        }
+        return 0;
+    }
 
     public int fieldRangeCompare(ITupleReference tupleA, ITupleReference tupleB, int startFieldIndex, int numFields) {
         for (int i = startFieldIndex; i < startFieldIndex + numFields; i++) {
             int cmp = cmps[i].compare(tupleA.getFieldData(i), tupleA.getFieldStart(i), tupleA.getFieldLength(i),
                     tupleB.getFieldData(i), tupleB.getFieldStart(i), tupleB.getFieldLength(i));
-            if (cmp < 0)
-                return -1;
-            else if (cmp > 0)
-                return 1;
+            if (cmp != 0) {
+                return cmp;
+            }
         }
         return 0;
     }
@@ -93,17 +86,37 @@
         for (int i = 0; i < cmpFactories.length; i++) {
             cmps[i] = cmpFactories[i].createBinaryComparator();
         }
-        return new MultiComparator(cmps);
+        if (cmps.length == 1) {
+            return new SingleComparator(cmps);
+        } else {
+            return new MultiComparator(cmps);
+        }
     }
-    
+
+    public static MultiComparator createIgnoreFieldLength(IBinaryComparatorFactory[] cmpFactories) {
+        IBinaryComparator[] cmps = new IBinaryComparator[cmpFactories.length];
+        for (int i = 0; i < cmpFactories.length; i++) {
+            cmps[i] = cmpFactories[i].createBinaryComparator();
+        }
+        if (cmps.length == 1) {
+            return new FieldLengthIgnoringSingleComparator(cmps);
+        } else {
+            return new FieldLengthIgnoringSingleComparator(cmps);
+        }
+    }
+
     public static MultiComparator create(IBinaryComparatorFactory[] cmpFactories, int startIndex, int numCmps) {
         IBinaryComparator[] cmps = new IBinaryComparator[numCmps];
         for (int i = startIndex; i < startIndex + numCmps; i++) {
             cmps[i] = cmpFactories[i].createBinaryComparator();
         }
-        return new MultiComparator(cmps);
+        if (cmps.length == 1) {
+            return new SingleComparator(cmps);
+        } else {
+            return new MultiComparator(cmps);
+        }
     }
-    
+
     public static MultiComparator create(IBinaryComparatorFactory[]... cmpFactories) {
         int size = 0;
         for (int i = 0; i < cmpFactories.length; i++) {
@@ -116,6 +129,10 @@
                 cmps[x++] = cmpFactories[i][j].createBinaryComparator();
             }
         }
-        return new MultiComparator(cmps);
+        if (cmps.length == 1) {
+            return new SingleComparator(cmps);
+        } else {
+            return new MultiComparator(cmps);
+        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/SingleComparator.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/SingleComparator.java
new file mode 100644
index 0000000..c8841f3
--- /dev/null
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/SingleComparator.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.common.ophelpers;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * MultiComparator optimized for the special case where there is only a single comparator.
+ */
+public class SingleComparator extends MultiComparator {
+
+    private final IBinaryComparator cmp;
+
+    protected SingleComparator(IBinaryComparator[] cmps) {
+        super(cmps);
+        this.cmp = cmps[0];
+    }
+
+    public int compare(ITupleReference tupleA, ITupleReference tupleB) {
+        return cmp.compare(tupleA.getFieldData(0), tupleA.getFieldStart(0), tupleA.getFieldLength(0),
+                tupleB.getFieldData(0), tupleB.getFieldStart(0), tupleB.getFieldLength(0));
+    }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListMerger.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListMerger.java
index 18ed73c..fbdfd64 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListMerger.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListMerger.java
@@ -39,7 +39,7 @@
     protected SearchResult newSearchResult;
 
     public InvertedListMerger(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
-        this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
+        this.invListCmp = MultiComparator.createIgnoreFieldLength(invIndex.getInvListCmpFactories());
         this.prevSearchResult = new SearchResult(invIndex.getInvListTypeTraits(), ctx);
         this.newSearchResult = new SearchResult(prevSearchResult);
     }