Integrated more native MR operators and a Shuffle Connector

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1244 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java
index e621a09..f79a464 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java
@@ -17,10 +17,9 @@
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 
 public interface IInputChannel {
-    public void registerMonitor(IInputChannelMonitor monitor) throws HyracksException;
+    public void registerMonitor(IInputChannelMonitor monitor);
 
     public void setAttachment(Object attachment);
 
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
index ae2cd37..609d7f0 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -63,7 +63,7 @@
     }
 
     @Override
-    public void registerMonitor(IInputChannelMonitor monitor) throws HyracksException {
+    public void registerMonitor(IInputChannelMonitor monitor) {
         this.monitor = monitor;
     }
 
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
index b742316..0ea3124 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -23,7 +23,6 @@
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.partitions.IPartition;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 
@@ -55,7 +54,7 @@
     }
 
     @Override
-    public void registerMonitor(IInputChannelMonitor monitor) throws HyracksException {
+    public void registerMonitor(IInputChannelMonitor monitor) {
         this.monitor = monitor;
     }
 
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java
new file mode 100644
index 0000000..e8b6e8b
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java
@@ -0,0 +1,268 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.HadoopNewPartitionerTuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+
+public class HadoopHelper {
+    public static final int KEY_FIELD_INDEX = 0;
+    public static final int VALUE_FIELD_INDEX = 1;
+    public static final int BLOCKID_FIELD_INDEX = 2;
+    private static final int[] KEY_SORT_FIELDS = new int[] { 0 };
+
+    private MarshalledWritable<Configuration> mConfig;
+    private Configuration config;
+    private Job job;
+
+    public HadoopHelper(MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
+        this.mConfig = mConfig;
+        ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+            config = mConfig.get();
+            config.setClassLoader(getClass().getClassLoader());
+            job = new Job(config);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            Thread.currentThread().setContextClassLoader(ctxCL);
+        }
+    }
+
+    public RecordDescriptor getMapOutputRecordDescriptor() throws HyracksDataException {
+        try {
+            return new RecordDescriptor(
+                    new ISerializerDeserializer[] {
+                            DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+                                    .getMapOutputKeyClass()),
+                            DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+                                    .getMapOutputValueClass()), IntegerSerializerDeserializer.INSTANCE });
+
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public RecordDescriptor getMapOutputRecordDescriptorWithoutExtraFields() throws HyracksDataException {
+        try {
+            return new RecordDescriptor(
+                    new ISerializerDeserializer[] {
+                            DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+                                    .getMapOutputKeyClass()),
+                            DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+                                    .getMapOutputValueClass()) });
+
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public TaskAttemptContext createTaskAttemptContext(TaskAttemptID taId) {
+        ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(config.getClassLoader());
+            return new TaskAttemptContext(config, taId);
+        } finally {
+            Thread.currentThread().setContextClassLoader(ctxCL);
+        }
+    }
+
+    public JobContext createJobContext() {
+        ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(config.getClassLoader());
+            return new JobContext(config, null);
+        } finally {
+            Thread.currentThread().setContextClassLoader(ctxCL);
+        }
+    }
+
+    public <K1, V1, K2, V2> Mapper<K1, V1, K2, V2> getMapper() throws HyracksDataException {
+        try {
+            return (Mapper<K1, V1, K2, V2>) HadoopTools.newInstance(job.getMapperClass());
+        } catch (ClassNotFoundException e) {
+            throw new HyracksDataException(e);
+        } catch (InstantiationException e) {
+            throw new HyracksDataException(e);
+        } catch (IllegalAccessException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public <K2, V2, K3, V3> Reducer<K2, V2, K3, V3> getReducer() throws HyracksDataException {
+        try {
+            return (Reducer<K2, V2, K3, V3>) HadoopTools.newInstance(job.getReducerClass());
+        } catch (ClassNotFoundException e) {
+            throw new HyracksDataException(e);
+        } catch (InstantiationException e) {
+            throw new HyracksDataException(e);
+        } catch (IllegalAccessException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public <K2, V2> Reducer<K2, V2, K2, V2> getCombiner() throws HyracksDataException {
+        try {
+            return (Reducer<K2, V2, K2, V2>) HadoopTools.newInstance(job.getCombinerClass());
+        } catch (ClassNotFoundException e) {
+            throw new HyracksDataException(e);
+        } catch (InstantiationException e) {
+            throw new HyracksDataException(e);
+        } catch (IllegalAccessException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public <K, V> InputFormat<K, V> getInputFormat() throws HyracksDataException {
+        try {
+            return (InputFormat<K, V>) ReflectionUtils.newInstance(job.getInputFormatClass(), config);
+        } catch (ClassNotFoundException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public <K, V> List<InputSplit> getInputSplits() throws HyracksDataException {
+        ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+            InputFormat<K, V> fmt = getInputFormat();
+            JobContext jCtx = new JobContext(config, null);
+            try {
+                return fmt.getSplits(jCtx);
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
+            }
+        } finally {
+            Thread.currentThread().setContextClassLoader(ctxCL);
+        }
+    }
+
+    public IBinaryComparatorFactory[] getSortComparatorFactories() {
+        WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(job
+                .getSortComparator().getClass());
+
+        return new IBinaryComparatorFactory[] { comparatorFactory };
+    }
+
+    public IBinaryComparatorFactory[] getGroupingComparatorFactories() {
+        WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(job
+                .getGroupingComparator().getClass());
+
+        return new IBinaryComparatorFactory[] { comparatorFactory };
+    }
+
+    public RawComparator<?> getRawGroupingComparator() {
+        return job.getGroupingComparator();
+    }
+
+    public int getSortFrameLimit(IHyracksCommonContext ctx) {
+        int sortMemory = job.getConfiguration().getInt("io.sort.mb", 100);
+        return (int) (((long) sortMemory * 1024 * 1024) / ctx.getFrameSize());
+    }
+
+    public Job getJob() {
+        return job;
+    }
+
+    public MarshalledWritable<Configuration> getMarshalledConfiguration() {
+        return mConfig;
+    }
+
+    public Configuration getConfiguration() {
+        return config;
+    }
+
+    public ITuplePartitionComputerFactory getTuplePartitionComputer() throws HyracksDataException {
+        int nReducers = job.getNumReduceTasks();
+        try {
+            return new HadoopNewPartitionerTuplePartitionComputerFactory<Writable, Writable>(
+                    (Class<? extends Partitioner<Writable, Writable>>) job.getPartitionerClass(),
+                    (ISerializerDeserializer<Writable>) DatatypeHelper
+                            .createSerializerDeserializer((Class<? extends Writable>) job.getMapOutputKeyClass()),
+                    (ISerializerDeserializer<Writable>) DatatypeHelper
+                            .createSerializerDeserializer((Class<? extends Writable>) job.getMapOutputValueClass()));
+        } catch (ClassNotFoundException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public int[] getSortFields() {
+        return KEY_SORT_FIELDS;
+    }
+
+    public <K> ISerializerDeserializer<K> getMapOutputKeySerializerDeserializer() {
+        return (ISerializerDeserializer<K>) DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+                .getMapOutputKeyClass());
+    }
+
+    public <V> ISerializerDeserializer<V> getMapOutputValueSerializerDeserializer() {
+        return (ISerializerDeserializer<V>) DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+                .getMapOutputValueClass());
+    }
+
+    public FileSystem getFilesystem() throws HyracksDataException {
+        try {
+            return FileSystem.get(config);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public <K, V> OutputFormat<K, V> getOutputFormat() throws HyracksDataException {
+        try {
+            return (OutputFormat<K, V>) ReflectionUtils.newInstance(job.getOutputFormatClass(), config);
+        } catch (ClassNotFoundException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public boolean hasCombiner() throws HyracksDataException {
+        try {
+            return job.getCombinerClass() != null;
+        } catch (ClassNotFoundException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopTools.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopTools.java
new file mode 100644
index 0000000..7e4d67f
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopTools.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+public class HadoopTools {
+    public static Object newInstance(String className) throws ClassNotFoundException, InstantiationException,
+            IllegalAccessException {
+        ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(HadoopTools.class.getClassLoader());
+            Class<?> clazz = Class.forName(className, true, HadoopTools.class.getClassLoader());
+            return newInstance(clazz);
+        } finally {
+            Thread.currentThread().setContextClassLoader(ctxCL);
+        }
+    }
+
+    public static Object newInstance(Class<?> clazz) throws InstantiationException, IllegalAccessException {
+        return clazz.newInstance();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HashPartitioningShuffleConnectorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HashPartitioningShuffleConnectorDescriptor.java
new file mode 100644
index 0000000..7c6bf86
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HashPartitioningShuffleConnectorDescriptor.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.util.BitSet;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
+import edu.uci.ics.hyracks.dataflow.std.collectors.PartitionCollector;
+import edu.uci.ics.hyracks.dataflow.std.connectors.PartitionDataWriter;
+
+public class HashPartitioningShuffleConnectorDescriptor extends AbstractMToNConnectorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    private final MarshalledWritable<Configuration> mConfig;
+
+    public HashPartitioningShuffleConnectorDescriptor(JobSpecification spec, MarshalledWritable<Configuration> mConfig) {
+        super(spec);
+        this.mConfig = mConfig;
+    }
+
+    @Override
+    public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+            IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+            throws HyracksDataException {
+        HadoopHelper helper = new HadoopHelper(mConfig);
+        ITuplePartitionComputerFactory tpcf = helper.getTuplePartitionComputer();
+        return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner());
+    }
+
+    @Override
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+            int receiverIndex, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+        BitSet expectedPartitions = new BitSet();
+        expectedPartitions.set(0, nProducerPartitions);
+        NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
+                expectedPartitions);
+        IFrameReader frameReader = new ShuffleFrameReader(ctx, channelReader, mConfig);
+        return new PartitionCollector(ctx, getConnectorId(), receiverIndex, expectedPartitions, frameReader,
+                channelReader);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProvider.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProvider.java
new file mode 100644
index 0000000..1545c06
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProvider.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IInputSplitProvider {
+    public InputSplit next() throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProviderFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProviderFactory.java
new file mode 100644
index 0000000..73588ab
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProviderFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IInputSplitProviderFactory extends Serializable {
+    public IInputSplitProvider createInputSplitProvider(int id) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/InputFileSplit.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/InputFileSplit.java
new file mode 100644
index 0000000..b37084e
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/InputFileSplit.java
@@ -0,0 +1,92 @@
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+public class InputFileSplit extends InputSplit implements Writable {
+    private Path file;
+    private long start;
+    private long length;
+    private int blockId;
+    private String[] hosts;
+    private long scheduleTime;
+
+    public InputFileSplit() {
+    }
+
+    public InputFileSplit(int blockId, Path file, long start, long length, String[] hosts, long schedule_time) {
+        this.blockId = blockId;
+        this.file = file;
+        this.start = start;
+        this.length = length;
+        this.hosts = hosts;
+        this.scheduleTime = schedule_time;
+    }
+
+    public int blockId() {
+        return blockId;
+    }
+
+    public long scheduleTime() {
+        return this.scheduleTime;
+    }
+
+    public Path getPath() {
+        return file;
+    }
+
+    /** The position of the first byte in the file to process. */
+    public long getStart() {
+        return start;
+    }
+
+    /** The number of bytes in the file to process. */
+    @Override
+    public long getLength() {
+        return length;
+    }
+
+    @Override
+    public String toString() {
+        return file + ":" + start + "+" + length;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, file.toString());
+        out.writeLong(start);
+        out.writeLong(length);
+        out.writeInt(blockId);
+        out.writeLong(this.scheduleTime);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        file = new Path(Text.readString(in));
+        start = in.readLong();
+        length = in.readLong();
+        hosts = null;
+        this.blockId = in.readInt();
+        this.scheduleTime = in.readLong();
+    }
+
+    @Override
+    public String[] getLocations() throws IOException {
+        if (this.hosts == null) {
+            return new String[] {};
+        } else {
+            return this.hosts;
+        }
+    }
+
+    public FileSplit toFileSplit() {
+        return new FileSplit(file, start, length, hosts);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/KVIterator.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/KVIterator.java
new file mode 100644
index 0000000..a00fa7f
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/KVIterator.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.util.Progress;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class KVIterator implements RawKeyValueIterator {
+    private final HadoopHelper helper;
+    private FrameTupleAccessor accessor;
+    private DataInputBuffer kBuffer;
+    private DataInputBuffer vBuffer;
+    private List<ByteBuffer> buffers;
+    private int bSize;
+    private int bPtr;
+    private int tIdx;
+    private boolean eog;
+
+    public KVIterator(IHyracksTaskContext ctx, HadoopHelper helper, RecordDescriptor recordDescriptor) {
+        this.helper = helper;
+        accessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+        kBuffer = new DataInputBuffer();
+        vBuffer = new DataInputBuffer();
+    }
+
+    void reset(List<ByteBuffer> buffers, int bSize) {
+        this.buffers = buffers;
+        this.bSize = bSize;
+        bPtr = 0;
+        tIdx = 0;
+        eog = false;
+        if (bSize > 0) {
+            accessor.reset(buffers.get(0));
+            tIdx = -1;
+        } else {
+            eog = true;
+        }
+    }
+
+    @Override
+    public DataInputBuffer getKey() throws IOException {
+        return kBuffer;
+    }
+
+    @Override
+    public DataInputBuffer getValue() throws IOException {
+        return vBuffer;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+        while (true) {
+            if (eog) {
+                return false;
+            }
+            ++tIdx;
+            if (accessor.getTupleCount() <= tIdx) {
+                ++bPtr;
+                if (bPtr >= bSize) {
+                    eog = true;
+                    continue;
+                }
+                tIdx = -1;
+                accessor.reset(buffers.get(bPtr));
+                continue;
+            }
+            kBuffer.reset(accessor.getBuffer().array(),
+                    FrameUtils.getAbsoluteFieldStartOffset(accessor, tIdx, helper.KEY_FIELD_INDEX),
+                    accessor.getFieldLength(tIdx, helper.KEY_FIELD_INDEX));
+            vBuffer.reset(accessor.getBuffer().array(),
+                    FrameUtils.getAbsoluteFieldStartOffset(accessor, tIdx, helper.VALUE_FIELD_INDEX),
+                    accessor.getFieldLength(tIdx, helper.VALUE_FIELD_INDEX));
+            break;
+        }
+        return true;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public Progress getProgress() {
+        return null;
+    }
+}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
new file mode 100644
index 0000000..4a61296
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;
+
+public class MapperOperatorDescriptor<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable>
+        extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    private final int jobId;
+    private final MarshalledWritable<Configuration> config;
+    private final IInputSplitProviderFactory factory;
+
+    public MapperOperatorDescriptor(JobSpecification spec, int jobId, MarshalledWritable<Configuration> config,
+            IInputSplitProviderFactory factory) throws HyracksDataException {
+        super(spec, 0, 1);
+        this.jobId = jobId;
+        this.config = config;
+        this.factory = factory;
+        HadoopHelper helper = new HadoopHelper(config);
+        recordDescriptors[0] = helper.getMapOutputRecordDescriptor();
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+            throws HyracksDataException {
+        final HadoopHelper helper = new HadoopHelper(config);
+        final Configuration conf = helper.getConfiguration();
+        final Mapper<K1, V1, K2, V2> mapper = helper.getMapper();
+        final InputFormat<K1, V1> inputFormat = helper.getInputFormat();
+        final IInputSplitProvider isp = factory.createInputSplitProvider(partition);
+        final TaskAttemptID taId = new TaskAttemptID("foo", jobId, true, partition, 0);
+        final TaskAttemptContext taskAttemptContext = helper.createTaskAttemptContext(taId);
+
+        final int framesLimit = helper.getSortFrameLimit(ctx);
+        final IBinaryComparatorFactory[] comparatorFactories = helper.getSortComparatorFactories();
+
+        class SortingRecordWriter extends RecordWriter<K2, V2> {
+            private final ArrayTupleBuilder tb;
+            private final ByteBuffer frame;
+            private final FrameTupleAppender fta;
+            private ExternalSortRunGenerator runGen;
+            private int blockId;
+
+            public SortingRecordWriter() throws HyracksDataException {
+                tb = new ArrayTupleBuilder(2);
+                frame = ctx.allocateFrame();
+                fta = new FrameTupleAppender(ctx.getFrameSize());
+                fta.reset(frame, true);
+            }
+
+            public void initBlock(int blockId) throws HyracksDataException {
+                runGen = new ExternalSortRunGenerator(ctx, new int[] { 0 }, null, comparatorFactories,
+                        helper.getMapOutputRecordDescriptorWithoutExtraFields(), framesLimit);
+                this.blockId = blockId;
+            }
+
+            @Override
+            public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
+            }
+
+            @Override
+            public void write(K2 key, V2 value) throws IOException, InterruptedException {
+                DataOutput dos = tb.getDataOutput();
+                tb.reset();
+                key.write(dos);
+                tb.addFieldEndOffset();
+                value.write(dos);
+                tb.addFieldEndOffset();
+                if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    runGen.nextFrame(frame);
+                    fta.reset(frame, true);
+                    if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                        throw new IllegalStateException();
+                    }
+                }
+            }
+
+            public void sortAndFlushBlock(final IFrameWriter writer) throws HyracksDataException {
+                if (fta.getTupleCount() > 0) {
+                    runGen.nextFrame(frame);
+                    fta.reset(frame, true);
+                }
+                runGen.close();
+                IFrameWriter delegatingWriter = new IFrameWriter() {
+                    private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+                    private final ByteBuffer outFrame = ctx.allocateFrame();
+                    private final FrameTupleAccessor fta = new FrameTupleAccessor(ctx.getFrameSize(),
+                            helper.getMapOutputRecordDescriptorWithoutExtraFields());
+                    private final ArrayTupleBuilder tb = new ArrayTupleBuilder(3);
+
+                    @Override
+                    public void open() throws HyracksDataException {
+                        appender.reset(outFrame, true);
+                    }
+
+                    @Override
+                    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                        fta.reset(buffer);
+                        int n = fta.getTupleCount();
+                        for (int i = 0; i < n; ++i) {
+                            tb.reset();
+                            tb.addField(fta, i, 0);
+                            tb.addField(fta, i, 1);
+                            try {
+                                tb.getDataOutput().writeInt(blockId);
+                            } catch (IOException e) {
+                                throw new HyracksDataException(e);
+                            }
+                            tb.addFieldEndOffset();
+                            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                                FrameUtils.flushFrame(outFrame, writer);
+                                appender.reset(outFrame, true);
+                                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                                    throw new IllegalStateException();
+                                }
+                            }
+                        }
+                    }
+
+                    @Override
+                    public void close() throws HyracksDataException {
+                        if (appender.getTupleCount() > 0) {
+                            FrameUtils.flushFrame(outFrame, writer);
+                        }
+                    }
+
+                    @Override
+                    public void fail() throws HyracksDataException {
+                        // TODO Auto-generated method stub
+
+                    }
+                };
+                if (helper.hasCombiner()) {
+                    Reducer<K2, V2, K2, V2> combiner = helper.getCombiner();
+                    TaskAttemptID ctaId = new TaskAttemptID("foo", jobId, true, partition, 0);
+                    TaskAttemptContext ctaskAttemptContext = helper.createTaskAttemptContext(taId);
+                    final IFrameWriter outputWriter = delegatingWriter;
+                    RecordWriter<K2, V2> recordWriter = new RecordWriter<K2, V2>() {
+                        private final FrameTupleAppender fta = new FrameTupleAppender(ctx.getFrameSize());
+                        private final ByteBuffer buffer = ctx.allocateFrame();
+                        private final ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+
+                        {
+                            fta.reset(buffer, true);
+                            outputWriter.open();
+                        }
+
+                        @Override
+                        public void write(K2 key, V2 value) throws IOException, InterruptedException {
+                            DataOutput dos = tb.getDataOutput();
+                            tb.reset();
+                            key.write(dos);
+                            tb.addFieldEndOffset();
+                            value.write(dos);
+                            tb.addFieldEndOffset();
+                            if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                                FrameUtils.flushFrame(buffer, outputWriter);
+                                fta.reset(buffer, true);
+                                if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                                    throw new IllegalStateException();
+                                }
+                            }
+                        }
+
+                        @Override
+                        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+                            if (fta.getTupleCount() > 0) {
+                                FrameUtils.flushFrame(buffer, outputWriter);
+                                outputWriter.close();
+                            }
+                        }
+                    };
+                    delegatingWriter = new ReduceWriter<K2, V2, K2, V2>(ctx, helper,
+                            new int[] { HadoopHelper.KEY_FIELD_INDEX }, helper.getGroupingComparatorFactories(),
+                            helper.getMapOutputRecordDescriptorWithoutExtraFields(), combiner, recordWriter, ctaId,
+                            ctaskAttemptContext);
+                }
+                IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+                for (int i = 0; i < comparatorFactories.length; ++i) {
+                    comparators[i] = comparatorFactories[i].createBinaryComparator();
+                }
+                ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, runGen.getFrameSorter(),
+                        runGen.getRuns(), new int[] { 0 }, comparators,
+                        helper.getMapOutputRecordDescriptorWithoutExtraFields(), framesLimit, delegatingWriter);
+                merger.process();
+            }
+        }
+
+        return new AbstractUnaryOutputSourceOperatorNodePushable() {
+            @Override
+            public void initialize() throws HyracksDataException {
+                writer.open();
+                try {
+                    SortingRecordWriter recordWriter = new SortingRecordWriter();
+                    InputSplit split = null;
+                    int blockId = 0;
+                    while ((split = isp.next()) != null) {
+                        try {
+                            RecordReader<K1, V1> recordReader = inputFormat.createRecordReader(split,
+                                    taskAttemptContext);
+                            ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+                            try {
+                                Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+                                recordReader.initialize(split, taskAttemptContext);
+                            } finally {
+                                Thread.currentThread().setContextClassLoader(ctxCL);
+                            }
+                            recordWriter.initBlock(blockId);
+                            Mapper<K1, V1, K2, V2>.Context mCtx = mapper.new Context(conf, taId, recordReader,
+                                    recordWriter, null, null, split);
+                            mapper.run(mCtx);
+                            recordReader.close();
+                            recordWriter.sortAndFlushBlock(writer);
+                            ++blockId;
+                        } catch (IOException e) {
+                            throw new HyracksDataException(e);
+                        } catch (InterruptedException e) {
+                            throw new HyracksDataException(e);
+                        }
+                    }
+                } finally {
+                    writer.close();
+                }
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MarshalledWritable.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MarshalledWritable.java
new file mode 100644
index 0000000..be05f22
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MarshalledWritable.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 University of California, Irvine
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.Writable;
+
+public class MarshalledWritable<T extends Writable> implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private byte[] bytes;
+
+    public MarshalledWritable() {
+    }
+
+    public void set(T o) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        dos.writeUTF(o.getClass().getName());
+        o.write(dos);
+        dos.close();
+        bytes = baos.toByteArray();
+    }
+
+    public T get() throws Exception {
+        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+        DataInputStream dis = new DataInputStream(bais);
+        String className = dis.readUTF();
+        T o = (T) HadoopTools.newInstance(className);
+        o.readFields(dis);
+        return o;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java
new file mode 100644
index 0000000..33d58af
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java
@@ -0,0 +1,187 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class ReduceWriter<K2, V2, K3, V3> implements IFrameWriter {
+    private final IHyracksTaskContext ctx;
+    private final HadoopHelper helper;
+    private final int[] groupFields;
+    private final FrameTupleAccessor accessor0;
+    private final FrameTupleAccessor accessor1;
+    private final ByteBuffer copyFrame;
+    private final IBinaryComparator[] comparators;
+    private final KVIterator kvi;
+    private final Reducer<K2, V2, K3, V3> reducer;
+    private final RecordWriter<K3, V3> recordWriter;
+    private final TaskAttemptID taId;
+    private final TaskAttemptContext taskAttemptContext;
+
+    private boolean first;
+    private boolean groupStarted;
+    private List<ByteBuffer> group;
+    private int bPtr;
+    private FrameTupleAppender fta;
+    private Counter keyCounter;
+    private Counter valueCounter;
+
+    public ReduceWriter(IHyracksTaskContext ctx, HadoopHelper helper, int[] groupFields,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
+            Reducer<K2, V2, K3, V3> reducer, RecordWriter<K3, V3> recordWriter, TaskAttemptID taId,
+            TaskAttemptContext taskAttemptContext) {
+        this.ctx = ctx;
+        this.helper = helper;
+        this.groupFields = groupFields;
+        accessor0 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+        accessor1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+        copyFrame = ctx.allocateFrame();
+        accessor1.reset(copyFrame);
+        comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        this.reducer = reducer;
+        this.recordWriter = recordWriter;
+        this.taId = taId;
+        this.taskAttemptContext = taskAttemptContext;
+
+        kvi = new KVIterator(ctx, helper, recordDescriptor);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        first = true;
+        groupStarted = false;
+        group = new ArrayList<ByteBuffer>();
+        bPtr = 0;
+        group.add(ctx.allocateFrame());
+        fta = new FrameTupleAppender(ctx.getFrameSize());
+        keyCounter = new Counter() {
+        };
+        valueCounter = new Counter() {
+        };
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor0.reset(buffer);
+        int nTuples = accessor0.getTupleCount();
+        for (int i = 0; i < nTuples; ++i) {
+            if (first) {
+                groupInit();
+                first = false;
+            } else {
+                if (i == 0) {
+                    switchGroupIfRequired(accessor1, accessor1.getTupleCount() - 1, accessor0, i);
+                } else {
+                    switchGroupIfRequired(accessor0, i - 1, accessor0, i);
+                }
+            }
+            accumulate(accessor0, i);
+        }
+        FrameUtils.copy(buffer, copyFrame);
+    }
+
+    private void accumulate(FrameTupleAccessor accessor, int tIndex) {
+        if (!fta.append(accessor, tIndex)) {
+            ++bPtr;
+            if (group.size() <= bPtr) {
+                group.add(ctx.allocateFrame());
+            }
+            fta.reset(group.get(bPtr), true);
+            if (!fta.append(accessor, tIndex)) {
+                throw new IllegalStateException();
+            }
+        }
+    }
+
+    private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
+            FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
+        if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
+            reduce();
+            groupInit();
+        }
+    }
+
+    private void groupInit() {
+        groupStarted = true;
+        bPtr = 0;
+        fta.reset(group.get(0), true);
+    }
+
+    private void reduce() throws HyracksDataException {
+        kvi.reset(group, bPtr + 1);
+        try {
+            Reducer<K2, V2, K3, V3>.Context rCtx = reducer.new Context(helper.getConfiguration(), taId, kvi,
+                    keyCounter, valueCounter, recordWriter, null, null,
+                    (RawComparator<K2>) helper.getRawGroupingComparator(), (Class<K2>) helper.getJob()
+                            .getMapOutputKeyClass(), (Class<V2>) helper.getJob().getMapOutputValueClass());
+            reducer.run(rCtx);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+        groupStarted = false;
+    }
+
+    private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
+        for (int i = 0; i < comparators.length; ++i) {
+            int fIdx = groupFields[i];
+            int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
+            int l1 = a1.getFieldLength(t1Idx, fIdx);
+            int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
+            int l2 = a2.getFieldLength(t2Idx, fIdx);
+            if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (groupStarted) {
+            reduce();
+        }
+        try {
+            recordWriter.close(taskAttemptContext);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReducerOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReducerOperatorDescriptor.java
new file mode 100644
index 0000000..bbb0d74
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReducerOperatorDescriptor.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+public class ReducerOperatorDescriptor<K2 extends Writable, V2 extends Writable, K3 extends Writable, V3 extends Writable>
+        extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    private final int jobId;
+
+    private MarshalledWritable<Configuration> mConfig;
+
+    public ReducerOperatorDescriptor(JobSpecification spec, int jobId, MarshalledWritable<Configuration> mConfig) {
+        super(spec, 1, 0);
+        this.jobId = jobId;
+        this.mConfig = mConfig;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        final HadoopHelper helper = new HadoopHelper(mConfig);
+        final Reducer<K2, V2, K3, V3> reducer = helper.getReducer();
+        final RecordDescriptor recordDescriptor = helper.getMapOutputRecordDescriptor();
+        final int[] groupFields = helper.getSortFields();
+        IBinaryComparatorFactory[] groupingComparators = helper.getGroupingComparatorFactories();
+
+        final TaskAttemptID taId = new TaskAttemptID("foo", jobId, false, partition, 0);
+        final TaskAttemptContext taskAttemptContext = helper.createTaskAttemptContext(taId);
+        final RecordWriter recordWriter;
+        try {
+            recordWriter = helper.getOutputFormat().getRecordWriter(taskAttemptContext);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+
+        final ReduceWriter<K2, V2, K3, V3> rw = new ReduceWriter<K2, V2, K3, V3>(ctx, helper, groupFields,
+                groupingComparators, recordDescriptor, reducer, recordWriter, taId, taskAttemptContext);
+
+        return new AbstractUnaryInputSinkOperatorNodePushable() {
+            @Override
+            public void open() throws HyracksDataException {
+                rw.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                rw.nextFrame(buffer);
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                rw.close();
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
new file mode 100644
index 0000000..8e03eae
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;
+
+public class ShuffleFrameReader implements IFrameReader {
+    private final IHyracksTaskContext ctx;
+    private final NonDeterministicChannelReader channelReader;
+    private final HadoopHelper helper;
+    private final RecordDescriptor recordDescriptor;
+    private List<RunFileWriter> runFileWriters;
+    private RunFileReader reader;
+
+    public ShuffleFrameReader(IHyracksTaskContext ctx, NonDeterministicChannelReader channelReader,
+            MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
+        this.ctx = ctx;
+        this.channelReader = channelReader;
+        helper = new HadoopHelper(mConfig);
+        this.recordDescriptor = helper.getMapOutputRecordDescriptor();
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        channelReader.open();
+        int nSenders = channelReader.getSenderPartitionCount();
+        runFileWriters = new ArrayList<RunFileWriter>();
+        RunInfo[] infos = new RunInfo[nSenders];
+        FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+        IInputChannel[] channels = channelReader.getChannels();
+        while (true) {
+            int entry = channelReader.findNextSender();
+            if (entry < 0) {
+                break;
+            }
+            RunInfo info = infos[entry];
+            IInputChannel channel = channels[entry];
+            ByteBuffer netBuffer = channel.getNextBuffer();
+            accessor.reset(netBuffer);
+            int nTuples = accessor.getTupleCount();
+            for (int i = 0; i < nTuples; ++i) {
+                int tBlockId = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+                        FrameUtils.getAbsoluteFieldStartOffset(accessor, i, HadoopHelper.BLOCKID_FIELD_INDEX));
+                if (info == null) {
+                    info = new RunInfo();
+                    info.reset(tBlockId);
+                    infos[entry] = info;
+                } else if (info.blockId != tBlockId) {
+                    info.close();
+                    info.reset(tBlockId);
+                }
+                info.write(accessor, i);
+            }
+            channel.recycleBuffer(netBuffer);
+        }
+        for (int i = 0; i < infos.length; ++i) {
+            RunInfo info = infos[i];
+            if (info != null) {
+                info.close();
+            }
+        }
+        infos = null;
+
+        FileReference outFile = ctx.createManagedWorkspaceFile(ShuffleFrameReader.class.getName() + ".run");
+        int framesLimit = helper.getSortFrameLimit(ctx);
+        IBinaryComparatorFactory[] comparatorFactories = helper.getSortComparatorFactories();
+        IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        List<IFrameReader> runs = new LinkedList<IFrameReader>();
+        for (RunFileWriter rfw : runFileWriters) {
+            runs.add(rfw.createReader());
+        }
+        RunFileWriter rfw = new RunFileWriter(outFile, ctx.getIOManager());
+        ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 }, comparators,
+                recordDescriptor, framesLimit, rfw);
+        merger.process();
+
+        reader = rfw.createReader();
+        reader.open();
+    }
+
+    @Override
+    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        return reader.nextFrame(buffer);
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        reader.close();
+    }
+
+    private class RunInfo {
+        private final ByteBuffer buffer;
+        private final FrameTupleAppender fta;
+
+        private FileReference file;
+        private RunFileWriter rfw;
+        private int blockId;
+
+        public RunInfo() {
+            buffer = ctx.allocateFrame();
+            fta = new FrameTupleAppender(ctx.getFrameSize());
+        }
+
+        public void reset(int blockId) throws HyracksDataException {
+            this.blockId = blockId;
+            fta.reset(buffer, true);
+            try {
+                file = ctx.createManagedWorkspaceFile(ShuffleFrameReader.class.getName() + ".run");
+                rfw = new RunFileWriter(file, ctx.getIOManager());
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        public void write(FrameTupleAccessor accessor, int tIdx) throws HyracksDataException {
+            if (!fta.append(accessor, tIdx)) {
+                flush();
+                if (!fta.append(accessor, tIdx)) {
+                    throw new IllegalStateException();
+                }
+            }
+        }
+
+        public void close() throws HyracksDataException {
+            flush();
+            rfw.close();
+            runFileWriters.add(rfw);
+        }
+
+        private void flush() throws HyracksDataException {
+            if (fta.getTupleCount() <= 0) {
+                return;
+            }
+            buffer.limit(buffer.capacity());
+            buffer.position(0);
+            rfw.nextFrame(buffer);
+            fta.reset(buffer, true);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/IPartitionAcceptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/IPartitionAcceptor.java
new file mode 100644
index 0000000..8942ea9
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/IPartitionAcceptor.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.collectors;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public interface IPartitionAcceptor {
+    public void addPartition(PartitionId pid, IInputChannel channel);
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/IPartitionBatchManager.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/IPartitionBatchManager.java
new file mode 100644
index 0000000..4962d64
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/IPartitionBatchManager.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.collectors;
+
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IPartitionBatchManager extends IPartitionAcceptor {
+    public void getNextBatch(List<IFrameReader> batch, int size) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/InputChannelFrameReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
new file mode 100644
index 0000000..713fb0e
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
@@ -0,0 +1,77 @@
+package edu.uci.ics.hyracks.dataflow.std.collectors;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class InputChannelFrameReader implements IFrameReader, IInputChannelMonitor {
+    private final IInputChannel channel;
+
+    private int availableFrames;
+
+    private boolean eos;
+
+    private boolean failed;
+
+    public InputChannelFrameReader(IInputChannel channel) {
+        this.channel = channel;
+        availableFrames = 0;
+        eos = false;
+        failed = false;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+    }
+
+    @Override
+    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        synchronized (this) {
+            while (!failed && !eos && availableFrames <= 0) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            if (failed) {
+                throw new HyracksDataException("Failure occurred on input");
+            }
+            if (availableFrames <= 0 && eos) {
+                return false;
+            }
+            --availableFrames;
+        }
+        ByteBuffer srcBuffer = channel.getNextBuffer();
+        FrameUtils.copy(srcBuffer, buffer);
+        channel.recycleBuffer(srcBuffer);
+        return true;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+
+    }
+
+    @Override
+    public synchronized void notifyFailure(IInputChannel channel) {
+        failed = true;
+        notifyAll();
+    }
+
+    @Override
+    public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
+        availableFrames += nFrames;
+        notifyAll();
+    }
+
+    @Override
+    public synchronized void notifyEndOfStream(IInputChannel channel) {
+        eos = true;
+        notifyAll();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
new file mode 100644
index 0000000..578bfec
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.collectors;
+
+import java.util.BitSet;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class NonDeterministicChannelReader implements IInputChannelMonitor, IPartitionAcceptor {
+    private static final Logger LOGGER = Logger.getLogger(NonDeterministicChannelReader.class.getName());
+
+    private final int nSenderPartitions;
+
+    private final IInputChannel[] channels;
+
+    private final BitSet frameAvailability;
+
+    private final int[] availableFrameCounts;
+
+    private final BitSet eosSenders;
+
+    private final BitSet failSenders;
+
+    private final BitSet closedSenders;
+
+    private int lastReadSender;
+
+    public NonDeterministicChannelReader(int nSenderPartitions, BitSet expectedPartitions) {
+        this.nSenderPartitions = nSenderPartitions;
+        channels = new IInputChannel[nSenderPartitions];
+        eosSenders = new BitSet(nSenderPartitions);
+        failSenders = new BitSet(nSenderPartitions);
+        closedSenders = new BitSet(nSenderPartitions);
+        closedSenders.or(expectedPartitions);
+        closedSenders.flip(0, nSenderPartitions);
+        frameAvailability = new BitSet(nSenderPartitions);
+        availableFrameCounts = new int[nSenderPartitions];
+    }
+
+    @Override
+    public void addPartition(PartitionId pid, IInputChannel channel) {
+        channel.registerMonitor(this);
+        channel.setAttachment(pid);
+        synchronized (this) {
+            channels[pid.getSenderIndex()] = channel;
+        }
+    }
+
+    public int getSenderPartitionCount() {
+        return nSenderPartitions;
+    }
+
+    public void open() throws HyracksDataException {
+        lastReadSender = 0;
+    }
+
+    public IInputChannel[] getChannels() {
+        return channels;
+    }
+
+    public synchronized int findNextSender() throws HyracksDataException {
+        while (true) {
+            switch (lastReadSender) {
+                default:
+                    lastReadSender = frameAvailability.nextSetBit(lastReadSender + 1);
+                    if (lastReadSender >= 0) {
+                        break;
+                    }
+                case 0:
+                    lastReadSender = frameAvailability.nextSetBit(0);
+            }
+            if (lastReadSender >= 0) {
+                assert availableFrameCounts[lastReadSender] > 0;
+                if (--availableFrameCounts[lastReadSender] == 0) {
+                    frameAvailability.clear(lastReadSender);
+                }
+                return lastReadSender;
+            }
+            if (!failSenders.isEmpty()) {
+                throw new HyracksDataException("Failure occurred on input");
+            }
+            for (int i = eosSenders.nextSetBit(0); i >= 0; i = eosSenders.nextSetBit(i)) {
+                channels[i].close();
+                eosSenders.clear(i);
+                closedSenders.set(i);
+            }
+            int nextClosedBitIndex = closedSenders.nextClearBit(0);
+            if (nextClosedBitIndex < 0 || nextClosedBitIndex >= nSenderPartitions) {
+                lastReadSender = -1;
+                return lastReadSender;
+            }
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+    }
+
+    public synchronized void close() throws HyracksDataException {
+        for (int i = closedSenders.nextClearBit(0); i >= 0 && i < nSenderPartitions; i = closedSenders
+                .nextClearBit(i + 1)) {
+            if (channels[i] != null) {
+                channels[i].close();
+                channels[i] = null;
+            }
+        }
+    }
+
+    @Override
+    public synchronized void notifyFailure(IInputChannel channel) {
+        PartitionId pid = (PartitionId) channel.getAttachment();
+        int senderIndex = pid.getSenderIndex();
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.fine("Failure: " + pid.getConnectorDescriptorId() + " sender: " + senderIndex + " receiver: "
+                    + pid.getReceiverIndex());
+        }
+        failSenders.set(senderIndex);
+        eosSenders.set(senderIndex);
+        notifyAll();
+    }
+
+    @Override
+    public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
+        PartitionId pid = (PartitionId) channel.getAttachment();
+        int senderIndex = pid.getSenderIndex();
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.fine("Data available: " + pid.getConnectorDescriptorId() + " sender: " + senderIndex + " receiver: "
+                    + pid.getReceiverIndex());
+        }
+        availableFrameCounts[senderIndex] += nFrames;
+        frameAvailability.set(senderIndex);
+        notifyAll();
+    }
+
+    @Override
+    public synchronized void notifyEndOfStream(IInputChannel channel) {
+        PartitionId pid = (PartitionId) channel.getAttachment();
+        int senderIndex = pid.getSenderIndex();
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.fine("EOS: " + pid);
+        }
+        eosSenders.set(senderIndex);
+        notifyAll();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java
new file mode 100644
index 0000000..657165c
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.collectors;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class NonDeterministicFrameReader implements IFrameReader {
+    private final NonDeterministicChannelReader channelReader;
+
+    public NonDeterministicFrameReader(NonDeterministicChannelReader channelReader) {
+        this.channelReader = channelReader;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        channelReader.open();
+    }
+
+    @Override
+    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        int index = channelReader.findNextSender();
+        if (index >= 0) {
+            IInputChannel[] channels = channelReader.getChannels();
+            ByteBuffer srcFrame = channels[index].getNextBuffer();
+            FrameUtils.copy(srcFrame, buffer);
+            channels[index].recycleBuffer(srcFrame);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public synchronized void close() throws HyracksDataException {
+        channelReader.close();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionBatchManager.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionBatchManager.java
new file mode 100644
index 0000000..ba25850
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionBatchManager.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.collectors;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class NonDeterministicPartitionBatchManager implements IPartitionBatchManager {
+    private final IInputChannel[] channels;
+
+    private List<IFrameReader> partitions;
+
+    private List<IFrameReader> batch;
+
+    private int requiredSize;
+
+    public NonDeterministicPartitionBatchManager(int nSenders) {
+        channels = new IInputChannel[nSenders];
+        partitions = new ArrayList<IFrameReader>();
+    }
+
+    @Override
+    public synchronized void addPartition(PartitionId pid, IInputChannel channel) {
+        channels[pid.getSenderIndex()] = channel;
+        InputChannelFrameReader channelReader = new InputChannelFrameReader(channel);
+        channel.registerMonitor(channelReader);
+        if (batch != null && batch.size() < requiredSize) {
+            batch.add(channelReader);
+            if (batch.size() == requiredSize) {
+                notifyAll();
+            }
+        } else {
+            partitions.add(channelReader);
+        }
+    }
+
+    @Override
+    public synchronized void getNextBatch(List<IFrameReader> batch, int size) throws HyracksDataException {
+        if (partitions.size() <= size) {
+            batch.addAll(partitions);
+            partitions.clear();
+        } else if (partitions.size() > size) {
+            List<IFrameReader> sublist = partitions.subList(0, size);
+            batch.addAll(sublist);
+            sublist.clear();
+        }
+        if (batch.size() == size) {
+            return;
+        }
+        this.batch = batch;
+        this.requiredSize = size;
+        while (batch.size() < size) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+        this.batch = null;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
deleted file mode 100644
index af2670b..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.collectors;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.channels.IInputChannel;
-import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.PartitionChannel;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class NonDeterministicPartitionCollector extends AbstractPartitionCollector {
-    private static final Logger LOGGER = Logger.getLogger(NonDeterministicPartitionCollector.class.getName());
-
-    private final FrameReader reader;
-
-    private final BitSet expectedPartitions;
-
-    private final int nSenderPartitions;
-
-    private final IInputChannel[] channels;
-
-    private final BitSet frameAvailability;
-
-    private final int[] availableFrameCounts;
-
-    private final BitSet eosSenders;
-
-    private final BitSet failSenders;
-
-    private BitSet closedSenders;
-
-    private int lastReadSender;
-
-    public NonDeterministicPartitionCollector(IHyracksTaskContext ctx, ConnectorDescriptorId connectorId,
-            int receiverIndex, int nSenderPartitions, BitSet expectedPartitions) {
-        super(ctx, connectorId, receiverIndex);
-        this.expectedPartitions = expectedPartitions;
-        this.nSenderPartitions = nSenderPartitions;
-        reader = new FrameReader();
-        channels = new IInputChannel[nSenderPartitions];
-        eosSenders = new BitSet(nSenderPartitions);
-        failSenders = new BitSet(nSenderPartitions);
-        closedSenders = new BitSet(nSenderPartitions);
-        closedSenders.or(expectedPartitions);
-        closedSenders.flip(0, nSenderPartitions);
-        frameAvailability = new BitSet(nSenderPartitions);
-        availableFrameCounts = new int[nSenderPartitions];
-    }
-
-    @Override
-    public void open() throws HyracksException {
-        lastReadSender = 0;
-    }
-
-    @Override
-    public void addPartitions(Collection<PartitionChannel> partitions) throws HyracksException {
-        for (PartitionChannel pc : partitions) {
-            PartitionId pid = pc.getPartitionId();
-            IInputChannel channel = pc.getInputChannel();
-            channel.setAttachment(pid);
-            channel.registerMonitor(reader);
-            synchronized (this) {
-                channels[pid.getSenderIndex()] = channel;
-            }
-            channel.open();
-        }
-    }
-
-    @Override
-    public IFrameReader getReader() throws HyracksException {
-        return reader;
-    }
-
-    @Override
-    public void close() throws HyracksException {
-    }
-
-    private final class FrameReader implements IFrameReader, IInputChannelMonitor {
-        @Override
-        public void open() throws HyracksDataException {
-        }
-
-        @Override
-        public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
-            findNextSender();
-            if (lastReadSender >= 0) {
-                ByteBuffer srcFrame = channels[lastReadSender].getNextBuffer();
-                FrameUtils.copy(srcFrame, buffer);
-                channels[lastReadSender].recycleBuffer(srcFrame);
-                return true;
-            }
-            return false;
-        }
-
-        private void findNextSender() throws HyracksDataException {
-            synchronized (NonDeterministicPartitionCollector.this) {
-                while (true) {
-                    switch (lastReadSender) {
-                        default:
-                            lastReadSender = frameAvailability.nextSetBit(lastReadSender + 1);
-                            if (lastReadSender >= 0) {
-                                break;
-                            }
-                        case 0:
-                            lastReadSender = frameAvailability.nextSetBit(0);
-                    }
-                    if (lastReadSender >= 0) {
-                        assert availableFrameCounts[lastReadSender] > 0;
-                        if (--availableFrameCounts[lastReadSender] == 0) {
-                            frameAvailability.clear(lastReadSender);
-                        }
-                        return;
-                    }
-                    if (!failSenders.isEmpty()) {
-                        throw new HyracksDataException("Failure occurred on input");
-                    }
-                    for (int i = eosSenders.nextSetBit(0); i >= 0; i = eosSenders.nextSetBit(i)) {
-                        channels[i].close();
-                        eosSenders.clear(i);
-                        closedSenders.set(i);
-                    }
-                    int nextClosedBitIndex = closedSenders.nextClearBit(0);
-                    if (nextClosedBitIndex < 0 || nextClosedBitIndex >= nSenderPartitions) {
-                        lastReadSender = -1;
-                        return;
-                    }
-                    try {
-                        NonDeterministicPartitionCollector.this.wait();
-                    } catch (InterruptedException e) {
-                        throw new HyracksDataException(e);
-                    }
-                }
-            }
-        }
-
-        @Override
-        public void close() throws HyracksDataException {
-            synchronized (NonDeterministicPartitionCollector.this) {
-                for (int i = closedSenders.nextClearBit(0); i >= 0 && i < nSenderPartitions; i = closedSenders
-                        .nextClearBit(i + 1)) {
-                    if (channels[i] != null) {
-                        channels[i].close();
-                        channels[i] = null;
-                    }
-                }
-            }
-        }
-
-        @Override
-        public void notifyFailure(IInputChannel channel) {
-            PartitionId pid = (PartitionId) channel.getAttachment();
-            int senderIndex = pid.getSenderIndex();
-            if (LOGGER.isLoggable(Level.FINE)) {
-                LOGGER.fine("Failure: " + connectorId + " sender: " + senderIndex + " receiver: " + receiverIndex);
-            }
-            synchronized (NonDeterministicPartitionCollector.this) {
-                failSenders.set(senderIndex);
-                eosSenders.set(senderIndex);
-                NonDeterministicPartitionCollector.this.notifyAll();
-            }
-        }
-
-        @Override
-        public void notifyDataAvailability(IInputChannel channel, int nFrames) {
-            PartitionId pid = (PartitionId) channel.getAttachment();
-            int senderIndex = pid.getSenderIndex();
-            if (LOGGER.isLoggable(Level.FINE)) {
-                LOGGER.fine("Data available: " + connectorId + " sender: " + senderIndex + " receiver: "
-                        + receiverIndex);
-            }
-            synchronized (NonDeterministicPartitionCollector.this) {
-                availableFrameCounts[senderIndex] += nFrames;
-                frameAvailability.set(senderIndex);
-                NonDeterministicPartitionCollector.this.notifyAll();
-            }
-        }
-
-        @Override
-        public void notifyEndOfStream(IInputChannel channel) {
-            PartitionId pid = (PartitionId) channel.getAttachment();
-            int senderIndex = pid.getSenderIndex();
-            if (LOGGER.isLoggable(Level.FINE)) {
-                LOGGER.fine("EOS: " + connectorId + " sender: " + senderIndex + " receiver: " + receiverIndex);
-            }
-            synchronized (NonDeterministicPartitionCollector.this) {
-                eosSenders.set(senderIndex);
-                NonDeterministicPartitionCollector.this.notifyAll();
-            }
-        }
-    }
-
-    @Override
-    public Collection<PartitionId> getRequiredPartitionIds() throws HyracksException {
-        Collection<PartitionId> c = new ArrayList<PartitionId>(expectedPartitions.cardinality());
-        for (int i = expectedPartitions.nextSetBit(0); i >= 0; i = expectedPartitions.nextSetBit(i + 1)) {
-            c.add(new PartitionId(getJobId(), getConnectorId(), i, getReceiverIndex()));
-        }
-        return c;
-    }
-
-    @Override
-    public void abort() {
-    }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/PartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/PartitionCollector.java
new file mode 100644
index 0000000..da5b0bc
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/PartitionCollector.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.collectors;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collection;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.PartitionChannel;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class PartitionCollector extends AbstractPartitionCollector {
+    private final BitSet expectedPartitions;
+
+    private final IFrameReader frameReader;
+
+    private final IPartitionAcceptor pa;
+
+    public PartitionCollector(IHyracksTaskContext ctx, ConnectorDescriptorId connectorId, int receiverIndex,
+            BitSet expectedPartitions, IFrameReader frameReader, IPartitionAcceptor pa) {
+        super(ctx, connectorId, receiverIndex);
+        this.expectedPartitions = expectedPartitions;
+        this.frameReader = frameReader;
+        this.pa = pa;
+    }
+
+    @Override
+    public void open() throws HyracksException {
+    }
+
+    @Override
+    public void addPartitions(Collection<PartitionChannel> partitions) throws HyracksException {
+        for (PartitionChannel pc : partitions) {
+            PartitionId pid = pc.getPartitionId();
+            IInputChannel channel = pc.getInputChannel();
+            pa.addPartition(pid, channel);
+            channel.open();
+        }
+    }
+
+    @Override
+    public IFrameReader getReader() throws HyracksException {
+        return frameReader;
+    }
+
+    @Override
+    public void close() throws HyracksException {
+
+    }
+
+    @Override
+    public Collection<PartitionId> getRequiredPartitionIds() throws HyracksException {
+        Collection<PartitionId> c = new ArrayList<PartitionId>(expectedPartitions.cardinality());
+        for (int i = expectedPartitions.nextSetBit(0); i >= 0; i = expectedPartitions.nextSetBit(i + 1)) {
+            c.add(new PartitionId(getJobId(), getConnectorId(), i, getReceiverIndex()));
+        }
+        return c;
+    }
+
+    @Override
+    public void abort() {
+
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java
new file mode 100644
index 0000000..5f41069
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.collectors;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.sort.RunMergingFrameReader;
+
+public class SortMergeFrameReader implements IFrameReader {
+    private IHyracksTaskContext ctx;
+    private final int maxConcurrentMerges;
+    private final int nSenders;
+    private final int[] sortFields;
+    private final IBinaryComparator[] comparators;
+    private final RecordDescriptor recordDescriptor;
+    private final IPartitionBatchManager pbm;
+
+    private RunMergingFrameReader merger;
+
+    public SortMergeFrameReader(IHyracksTaskContext ctx, int maxConcurrentMerges, int nSenders, int[] sortFields,
+            IBinaryComparator[] comparators, RecordDescriptor recordDescriptor, IPartitionBatchManager pbm) {
+        this.ctx = ctx;
+        this.maxConcurrentMerges = maxConcurrentMerges;
+        this.nSenders = nSenders;
+        this.sortFields = sortFields;
+        this.comparators = comparators;
+        this.recordDescriptor = recordDescriptor;
+        this.pbm = pbm;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        if (maxConcurrentMerges >= nSenders) {
+            List<ByteBuffer> inFrames = new ArrayList<ByteBuffer>();
+            for (int i = 0; i < nSenders; ++i) {
+                inFrames.add(ByteBuffer.allocate(ctx.getFrameSize()));
+            }
+            List<IFrameReader> batch = new ArrayList<IFrameReader>();
+            pbm.getNextBatch(batch, nSenders);
+            merger = new RunMergingFrameReader(ctx, batch.toArray(new IFrameReader[nSenders]), inFrames, sortFields,
+                    comparators, recordDescriptor);
+        } else {
+            // multi level merge.
+            throw new HyracksDataException("Not yet supported");
+        }
+        merger.open();
+    }
+
+    @Override
+    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        buffer.position(buffer.capacity());
+        buffer.limit(buffer.capacity());
+        return merger.nextFrame(buffer);
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        merger.close();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java
deleted file mode 100644
index 96b2a69..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.collectors;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.channels.IInputChannel;
-import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.PartitionChannel;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.std.sort.RunMergingFrameReader;
-
-public class SortMergePartitionCollector extends AbstractPartitionCollector {
-    private final int[] sortFields;
-
-    private final IBinaryComparator[] comparators;
-
-    private final RecordDescriptor recordDescriptor;
-
-    private final int maxConcurrentMerges;
-
-    private final IInputChannel[] channels;
-
-    private final int nSenders;
-
-    private final boolean stable;
-
-    private final FrameReader frameReader;
-
-    private final PartitionBatchManager pbm;
-
-    public SortMergePartitionCollector(IHyracksTaskContext ctx, ConnectorDescriptorId connectorId, int receiverIndex,
-            int[] sortFields, IBinaryComparator[] comparators, RecordDescriptor recordDescriptor,
-            int maxConcurrentMerges, int nSenders, boolean stable) {
-        super(ctx, connectorId, receiverIndex);
-        this.sortFields = sortFields;
-        this.comparators = comparators;
-        this.recordDescriptor = recordDescriptor;
-        this.maxConcurrentMerges = maxConcurrentMerges;
-        channels = new IInputChannel[nSenders];
-        this.nSenders = nSenders;
-        this.stable = stable;
-        this.frameReader = new FrameReader();
-        pbm = new NonDeterministicPartitionBatchManager();
-    }
-
-    @Override
-    public void open() throws HyracksException {
-    }
-
-    @Override
-    public void addPartitions(Collection<PartitionChannel> partitions) throws HyracksException {
-        for (PartitionChannel pc : partitions) {
-            PartitionId pid = pc.getPartitionId();
-            IInputChannel channel = pc.getInputChannel();
-            InputChannelFrameReader channelReader = new InputChannelFrameReader(channel);
-            channel.registerMonitor(channelReader);
-            channel.setAttachment(channelReader);
-            int senderIndex = pid.getSenderIndex();
-            synchronized (this) {
-                channels[senderIndex] = channel;
-            }
-            pbm.addPartition(senderIndex);
-            channel.open();
-        }
-    }
-
-    @Override
-    public IFrameReader getReader() throws HyracksException {
-        return frameReader;
-    }
-
-    @Override
-    public void close() throws HyracksException {
-
-    }
-
-    @Override
-    public Collection<PartitionId> getRequiredPartitionIds() throws HyracksException {
-        Collection<PartitionId> requiredPartitionIds = new ArrayList<PartitionId>();
-        for (int i = 0; i < nSenders; ++i) {
-            requiredPartitionIds.add(new PartitionId(getJobId(), getConnectorId(), i, receiverIndex));
-        }
-        return requiredPartitionIds;
-    }
-
-    @Override
-    public void abort() {
-
-    }
-
-    private abstract class PartitionBatchManager {
-        protected abstract void addPartition(int index);
-
-        protected abstract void getNextBatch(List<IFrameReader> batch, int size) throws HyracksDataException;
-    }
-
-    private class NonDeterministicPartitionBatchManager extends PartitionBatchManager {
-        private List<IFrameReader> partitions;
-
-        private List<IFrameReader> batch;
-
-        private int requiredSize;
-
-        public NonDeterministicPartitionBatchManager() {
-            partitions = new ArrayList<IFrameReader>();
-        }
-
-        @Override
-        protected void addPartition(int index) {
-            synchronized (SortMergePartitionCollector.this) {
-                if (batch != null && batch.size() < requiredSize) {
-                    batch.add((IFrameReader) channels[index].getAttachment());
-                    if (batch.size() == requiredSize) {
-                        SortMergePartitionCollector.this.notifyAll();
-                    }
-                } else {
-                    partitions.add((IFrameReader) channels[index].getAttachment());
-                }
-            }
-        }
-
-        @Override
-        protected void getNextBatch(List<IFrameReader> batch, int size) throws HyracksDataException {
-            synchronized (SortMergePartitionCollector.this) {
-                if (partitions.size() <= size) {
-                    batch.addAll(partitions);
-                    partitions.clear();
-                } else if (partitions.size() > size) {
-                    List<IFrameReader> sublist = partitions.subList(0, size);
-                    batch.addAll(sublist);
-                    sublist.clear();
-                }
-                if (batch.size() == size) {
-                    return;
-                }
-                this.batch = batch;
-                this.requiredSize = size;
-                while (batch.size() < size) {
-                    try {
-                        SortMergePartitionCollector.this.wait();
-                    } catch (InterruptedException e) {
-                        throw new HyracksDataException(e);
-                    }
-                }
-                this.batch = null;
-            }
-        }
-    }
-
-    private static class InputChannelFrameReader implements IFrameReader, IInputChannelMonitor {
-        private final IInputChannel channel;
-
-        private int availableFrames;
-
-        private boolean eos;
-
-        private boolean failed;
-
-        public InputChannelFrameReader(IInputChannel channel) {
-            this.channel = channel;
-            availableFrames = 0;
-            eos = false;
-            failed = false;
-        }
-
-        @Override
-        public void open() throws HyracksDataException {
-        }
-
-        @Override
-        public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
-            synchronized (this) {
-                while (!failed && !eos && availableFrames <= 0) {
-                    try {
-                        wait();
-                    } catch (InterruptedException e) {
-                        throw new HyracksDataException(e);
-                    }
-                }
-                if (failed) {
-                    throw new HyracksDataException("Failure occurred on input");
-                }
-                if (availableFrames <= 0 && eos) {
-                    return false;
-                }
-                --availableFrames;
-            }
-            ByteBuffer srcBuffer = channel.getNextBuffer();
-            FrameUtils.copy(srcBuffer, buffer);
-            channel.recycleBuffer(srcBuffer);
-            return true;
-        }
-
-        @Override
-        public void close() throws HyracksDataException {
-
-        }
-
-        @Override
-        public synchronized void notifyFailure(IInputChannel channel) {
-            failed = true;
-            notifyAll();
-        }
-
-        @Override
-        public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
-            availableFrames += nFrames;
-            notifyAll();
-        }
-
-        @Override
-        public synchronized void notifyEndOfStream(IInputChannel channel) {
-            eos = true;
-            notifyAll();
-        }
-    }
-
-    private class FrameReader implements IFrameReader {
-        private RunMergingFrameReader merger;
-
-        @Override
-        public void open() throws HyracksDataException {
-            if (maxConcurrentMerges >= nSenders) {
-                List<ByteBuffer> inFrames = new ArrayList<ByteBuffer>();
-                for (int i = 0; i < nSenders; ++i) {
-                    inFrames.add(ByteBuffer.allocate(ctx.getFrameSize()));
-                }
-                List<IFrameReader> batch = new ArrayList<IFrameReader>();
-                pbm.getNextBatch(batch, nSenders);
-                merger = new RunMergingFrameReader(ctx, batch.toArray(new IFrameReader[nSenders]), inFrames,
-                        sortFields, comparators, recordDescriptor);
-            } else {
-                // multi level merge.
-                throw new HyracksDataException("Not yet supported");
-            }
-            merger.open();
-        }
-
-        @Override
-        public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
-            buffer.position(buffer.capacity());
-            buffer.limit(buffer.capacity());
-            return merger.nextFrame(buffer);
-        }
-
-        @Override
-        public void close() throws HyracksDataException {
-            merger.close();
-        }
-    }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
index 7d92150..0e1e413 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
@@ -25,7 +25,9 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicPartitionCollector;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicFrameReader;
+import edu.uci.ics.hyracks.dataflow.std.collectors.PartitionCollector;
 
 public class LocalityAwareMToNPartitioningConnectorDescriptor extends AbstractMToNConnectorDescriptor {
 
@@ -75,8 +77,11 @@
             if (localityMap.isConnected(i, receiverIndex, nConsumerPartitions))
                 expectedPartitions.set(i);
         }
-        return new NonDeterministicPartitionCollector(ctx, getConnectorId(), receiverIndex, nProducerPartitions,
+        NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
                 expectedPartitions);
+        NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
+        return new PartitionCollector(ctx, getConnectorId(), receiverIndex, expectedPartitions, frameReader,
+                channelReader);
     }
 
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
index 12935af..291e149 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
@@ -25,7 +25,9 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicPartitionCollector;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicFrameReader;
+import edu.uci.ics.hyracks.dataflow.std.collectors.PartitionCollector;
 
 public class MToNPartitioningConnectorDescriptor extends AbstractMToNConnectorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -50,7 +52,9 @@
             int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         BitSet expectedPartitions = new BitSet(nProducerPartitions);
         expectedPartitions.set(0, nProducerPartitions);
-        return new NonDeterministicPartitionCollector(ctx, getConnectorId(), index, nProducerPartitions,
+        NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
                 expectedPartitions);
+        NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
+        return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader);
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
index 54979f0..2576423 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
@@ -14,6 +14,9 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.connectors;
 
+import java.util.BitSet;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
 import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
@@ -25,7 +28,10 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.collectors.SortMergePartitionCollector;
+import edu.uci.ics.hyracks.dataflow.std.collectors.IPartitionBatchManager;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicPartitionBatchManager;
+import edu.uci.ics.hyracks.dataflow.std.collectors.PartitionCollector;
+import edu.uci.ics.hyracks.dataflow.std.collectors.SortMergeFrameReader;
 
 public class MToNPartitioningMergingConnectorDescriptor extends AbstractMToNConnectorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -65,7 +71,11 @@
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
-        return new SortMergePartitionCollector(ctx, getConnectorId(), index, sortFields, comparators, recordDesc,
-                nProducerPartitions, nProducerPartitions, stable);
+        IPartitionBatchManager pbm = new NonDeterministicPartitionBatchManager(nProducerPartitions);
+        IFrameReader sortMergeFrameReader = new SortMergeFrameReader(ctx, nProducerPartitions, nProducerPartitions,
+                sortFields, comparators, recordDesc, pbm);
+        BitSet expectedPartitions = new BitSet();
+        expectedPartitions.set(0, nProducerPartitions);
+        return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, sortMergeFrameReader, pbm);
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
index ba66ebc..fd67f77 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
@@ -25,7 +25,9 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicPartitionCollector;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicFrameReader;
+import edu.uci.ics.hyracks.dataflow.std.collectors.PartitionCollector;
 
 public class MToNReplicatingConnectorDescriptor extends AbstractMToNConnectorDescriptor {
     public MToNReplicatingConnectorDescriptor(JobSpecification spec) {
@@ -82,7 +84,9 @@
             int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         BitSet expectedPartitions = new BitSet(nProducerPartitions);
         expectedPartitions.set(0, nProducerPartitions);
-        return new NonDeterministicPartitionCollector(ctx, getConnectorId(), index, nProducerPartitions,
+        NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
                 expectedPartitions);
+        NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
+        return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader);
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index 7c227e2..e8d9751 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -30,7 +30,9 @@
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicPartitionCollector;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicFrameReader;
+import edu.uci.ics.hyracks.dataflow.std.collectors.PartitionCollector;
 
 public class OneToOneConnectorDescriptor extends AbstractConnectorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -51,8 +53,10 @@
             int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         BitSet expectedPartitions = new BitSet(nProducerPartitions);
         expectedPartitions.set(index);
-        return new NonDeterministicPartitionCollector(ctx, getConnectorId(), index, nProducerPartitions,
+        NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
                 expectedPartitions);
+        NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
+        return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader);
     }
 
     @Override