Removed commented overlog code. Added auto-expand test

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks-next@24 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
index 8b7bcff..3f03526 100644
--- a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
+++ b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
@@ -122,48 +122,6 @@
 watch(availablenodes, i);
 watch(availablenodes, d);
 
-/*
-
-define(rankedavailablenodes_temp, keys(), {String, Integer});
-
-rankedavailablenodes_temp(NodeId, 0) :-
-    availablenodes#insert(NodeId);
-
-rankedavailablenodes_temp(NodeId2, NewRank) :-
-    rankedavailablenodes_temp#insert(NodeId1, Rank),
-    rankedavailablenodes_temp(NodeId2, Rank),
-    NodeId1 < NodeId2
-    {
-        NewRank := Rank + 1;
-    };
-
-rankedavailablenodes_temp(NodeId2, NewRank) :-
-    rankedavailablenodes_temp(NodeId1, Rank),
-    rankedavailablenodes_temp#insert(NodeId2, Rank),
-    NodeId1 < NodeId2
-    {
-        NewRank := Rank + 1;
-    };
-
-rankedavailablenodes_temp(NodeId, Rank) :-
-    availablenodes(NodeId),
-    rankedavailablenodes_temp(NodeId, Rank);
-
-rankedavailablenodes_temp(NodeId, NewRank) :-
-    rankedavailablenodes_temp(NodeId, Rank),
-    Rank != 0,
-    notin rankedavailablenodes_temp(_, Rank - 1)
-    {
-        NewRank := Rank - 1;
-    };
-
-define(rankedavailablenodes, keys(0), {String, Integer});
-
-rankedavailablenodes(NodeId, max<Rank>) :-
-    rankedavailablenodes_temp(NodeId, Rank);
-
-*/
-
 define(availablenodecount, keys(0), {Integer, Integer});
 
 watch(availablenodecount, a);
@@ -213,41 +171,8 @@
 operatorclonecount_temp(JobId, OperatorId, NPartitions, 0) :-
     operatorclonecount(JobId, OperatorId, NPartitions);
 
-/*
-define(operatorclonecountexpansion, keys(0, 1), {UUID, OperatorDescriptorId, Integer});
-
-operatorclonecountexpansion(JobId, OperatorId, 0) :-
-    operatorclonecount(JobId, OperatorId, _);
-
-operatorclonecountexpansion(JobId, OperatorId, Partition + 1) :-
-    operatorclonecountexpansion#insert(JobId, OperatorId, Partition),
-    operatorclonecount(JobId, OperatorId, NPartitions),
-    Partition < NPartitions - 1;
-*/
-
 define(operatorclonecountexpansiontotalorder, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, Integer});
 
-/*
-operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, 0) :-
-    operatorclonecountexpansion#insert(JobId, OperatorId, Partition);
-
-operatorclonecountexpansiontotalorder(JobId, OperatorId2, Partition2, NewRank) :-
-    operatorclonecountexpansiontotalorder#insert(JobId, OperatorId1, Partition1, Rank),
-    operatorclonecountexpansiontotalorder(JobId, OperatorId2, Partition2, Rank),
-    OperatorId1.hashCode() < OperatorId2.hashCode() || (OperatorId1.hashCode() == OperatorId2.hashCode() && Partition1 < Partition2)
-    {
-        NewRank := Rank + 1;
-    };
-
-operatorclonecountexpansiontotalorder(JobId, OperatorId2, Partition2, NewRank) :-
-    operatorclonecountexpansiontotalorder(JobId, OperatorId1, Partition1, Rank),
-    operatorclonecountexpansiontotalorder#insert(JobId, OperatorId2, Partition2, Rank),
-    OperatorId1.hashCode() < OperatorId2.hashCode() || (OperatorId1.hashCode() == OperatorId2.hashCode() && Partition1 < Partition2)
-    {
-        NewRank := Rank + 1;
-    };
-*/
-
 operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, Rank) :-
     expandpartitioncountconstraint(operatorclonecount_temp(JobId, OperatorId, Partition, Rank));
 
@@ -259,12 +184,6 @@
 watch(operatorclonecount, i);
 watch(operatorclonecount, d);
 
-/*
-watch(operatorclonecountexpansion, a);
-watch(operatorclonecountexpansion, i);
-watch(operatorclonecountexpansion, d);
-*/
-
 define(activitystart, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer, Integer, UUID, String, Integer});
 
 activitystart(JobId, OperatorId, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition) :-
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 7494f11..6770fd8 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
 import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
@@ -259,6 +260,115 @@
     }
 
     @Test
+    public void customerOrderCIDJoinAutoExpand() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] {
+            new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
+            new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl"))
+        };
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+            new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+            new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
+        };
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
+
+        CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
+            '|', "'\"");
+        PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
+        ordScanner.setPartitionConstraint(ordersPartitionConstraint);
+
+        CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
+            "'\"");
+        PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
+        custScanner.setPartitionConstraint(custPartitionConstraint);
+
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
+            1
+        }, new int[] {
+            0
+        }, new IBinaryHashFunctionFactory[] {
+            StringBinaryHashFunctionFactory.INSTANCE
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }, custOrderJoinDesc, 128);
+        join.setPartitionConstraint(new PartitionCountConstraint(2));
+
+        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
+        printer.setPartitionConstraint(printerPartitionConstraint);
+
+        IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
+            new FieldHashPartitionComputerFactory(new int[] {
+                1
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
+        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+        IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
+            new FieldHashPartitionComputerFactory(new int[] {
+                0
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
+        spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+        IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
     public void customerOrderCIDJoinMultiMaterialized() throws Exception {
         JobSpecification spec = new JobSpecification();