Merge branch 'mad-hatter' into 'master'

Change-Id: I24f42dc32931a214a27c2c88fde163f665cc83db
diff --git a/asterixdb/asterix-algebra/src/main/javacc/AQLPlusExtension.jj b/asterixdb/asterix-algebra/src/main/javacc/AQLPlusExtension.jj
index c3cfdb1..35e8ff0 100644
--- a/asterixdb/asterix-algebra/src/main/javacc/AQLPlusExtension.jj
+++ b/asterixdb/asterix-algebra/src/main/javacc/AQLPlusExtension.jj
@@ -106,30 +106,24 @@
 @new
 Clause MetaVariableClause() throws ParseException :
 {
-    MetaVariableClause mc = new MetaVariableClause();
-    VarIdentifier var = new VarIdentifier();
 }
 {
     <METAVARIABLECLAUSE>
     {
-      mc.setVar(var);
-      var.setValue(token.image);
-      return mc;
+      VarIdentifier var = new VarIdentifier(token.image);
+      return new MetaVariableClause(var);
     }
 }
 
 @new
 MetaVariableExpr MetaVariableRef() throws ParseException:
 {
-    MetaVariableExpr metaVarExp = new MetaVariableExpr();
-    VarIdentifier var = new VarIdentifier();
 }
 {
-      <METAVARIABLE>
+    <METAVARIABLE>
     {
-     metaVarExp.setVar(var);
-     var.setValue(token.image);
-     return metaVarExp;
+      VarIdentifier var = new VarIdentifier(token.image);
+      return new MetaVariableExpr(var);
     }
 }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index aed358e..60471f1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -557,11 +557,20 @@
         SourceLocation sourceLoc = dd.getSourceLocation();
         String dataverseName = getActiveDataverse(dd.getDataverse());
         String datasetName = dd.getName().getValue();
+        String datasetFullyQualifiedName = dataverseName + "." + datasetName;
         DatasetType dsType = dd.getDatasetType();
         String itemTypeDataverseName = getActiveDataverse(dd.getItemTypeDataverse());
         String itemTypeName = dd.getItemTypeName().getValue();
-        String metaItemTypeDataverseName = getActiveDataverse(dd.getMetaItemTypeDataverse());
-        String metaItemTypeName = dd.getMetaItemTypeName().getValue();
+        String itemTypeFullyQualifiedName = itemTypeDataverseName + "." + itemTypeName;
+        String metaItemTypeDataverseName = null;
+        String metaItemTypeName = null;
+        String metaItemTypeFullyQualifiedName = null;
+        Identifier metaItemTypeId = dd.getMetaItemTypeName();
+        if (metaItemTypeId != null) {
+            metaItemTypeName = metaItemTypeId.getValue();
+            metaItemTypeDataverseName = getActiveDataverse(dd.getMetaItemTypeDataverse());
+            metaItemTypeFullyQualifiedName = metaItemTypeDataverseName + "." + metaItemTypeName;
+        }
         Identifier ngNameId = dd.getNodegroupName();
         String nodegroupName = ngNameId == null ? null : ngNameId.getValue();
         String compactionPolicy = dd.getCompactionPolicy();
@@ -573,12 +582,12 @@
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         MetadataLockUtil.createDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName,
-                itemTypeDataverseName, itemTypeDataverseName + "." + itemTypeName, metaItemTypeDataverseName,
-                metaItemTypeDataverseName + "." + metaItemTypeName, nodegroupName, compactionPolicy,
-                dataverseName + "." + datasetName, defaultCompactionPolicy);
+                itemTypeDataverseName, itemTypeFullyQualifiedName, metaItemTypeDataverseName,
+                metaItemTypeFullyQualifiedName, nodegroupName, compactionPolicy, datasetFullyQualifiedName,
+                defaultCompactionPolicy);
         Dataset dataset = null;
         try {
-            IDatasetDetails datasetDetails = null;
+            IDatasetDetails datasetDetails;
             Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
             if (ds != null) {
                 if (dd.getIfNotExists()) {
@@ -1372,8 +1381,8 @@
                 }
             }
 
-            if (activeDataverse != null && activeDataverse.getDataverseName() == dataverseName) {
-                activeDataverse = null;
+            if (activeDataverse.getDataverseName().equals(dataverseName)) {
+                activeDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE;
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             return true;
@@ -1383,8 +1392,8 @@
             }
 
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-                if (activeDataverse != null && activeDataverse.getDataverseName() == dataverseName) {
-                    activeDataverse = null;
+                if (activeDataverse.getDataverseName().equals(dataverseName)) {
+                    activeDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE;
                 }
 
                 // #. execute compensation operations
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/clause/ForClause.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/clause/ForClause.java
index 9bae2a0..39ad04f 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/clause/ForClause.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/clause/ForClause.java
@@ -31,17 +31,14 @@
     private Expression inExpr = null;
 
     public ForClause() {
-        super();
     }
 
     public ForClause(VariableExpr varExpr, Expression inExpr) {
-        super();
         this.varExpr = varExpr;
         this.inExpr = inExpr;
     }
 
     public ForClause(VariableExpr varExpr, Expression inExpr, VariableExpr posExpr) {
-        super();
         this.varExpr = varExpr;
         this.inExpr = inExpr;
         this.posExpr = posExpr;
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/clause/MetaVariableClause.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/clause/MetaVariableClause.java
index 7712cf9..bbf7269 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/clause/MetaVariableClause.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/clause/MetaVariableClause.java
@@ -27,6 +27,13 @@
 public class MetaVariableClause extends AbstractClause {
     private VarIdentifier var;
 
+    public MetaVariableClause() {
+    }
+
+    public MetaVariableClause(VarIdentifier var) {
+        this.var = var;
+    }
+
     @Override
     public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
         return ((IAQLPlusVisitor<R, T>) visitor).visitMetaVariableClause(this, arg);
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/expression/MetaVariableExpr.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/expression/MetaVariableExpr.java
index 2964eff..fb03312 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/expression/MetaVariableExpr.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/expression/MetaVariableExpr.java
@@ -21,10 +21,15 @@
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.lang.aql.visitor.base.IAQLPlusVisitor;
 import org.apache.asterix.lang.common.expression.VariableExpr;
+import org.apache.asterix.lang.common.struct.VarIdentifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
 
 public class MetaVariableExpr extends VariableExpr {
 
+    public MetaVariableExpr(VarIdentifier var) {
+        super(var);
+    }
+
     @Override
     public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
         return ((IAQLPlusVisitor<R, T>) visitor).visitMetaVariableExpr(this, arg);
diff --git a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
index 3bb70c1..810d8e9 100644
--- a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
+++ b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
@@ -152,6 +152,7 @@
 import org.apache.asterix.lang.common.struct.VarIdentifier;
 import org.apache.asterix.lang.common.util.RangeMapBuilder;
 import org.apache.asterix.metadata.utils.MetadataConstants;
+import org.apache.asterix.om.types.BuiltinType;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -184,6 +185,8 @@
     // data generator hints
     private static final String DGEN_HINT = "dgen";
 
+    private static final String INT_TYPE_NAME = "int";
+
     private static class IndexParams {
       public IndexType type;
       public int gramLength;
@@ -797,15 +800,13 @@
 {
   <LEFTPAREN> (<VARIABLE>
     {
-      var = new VarIdentifier();
-      var.setValue(token.image);
+      var = new VarIdentifier(token.image);
       paramList.add(var);
       getCurrentScope().addNewVarSymbolToScope(var);
     }
   ( <COMMA> <VARIABLE>
     {
-      var = new VarIdentifier();
-      var.setValue(token.image);
+      var = new VarIdentifier(token.image);
       paramList.add(var);
       getCurrentScope().addNewVarSymbolToScope(var);
     }
@@ -1386,8 +1387,8 @@
 {
  id = QualifiedName()
    {
-     if (id.first == null && id.second.getValue().equalsIgnoreCase("int")) {
-        id.second.setValue("int64");
+     if (id.first == null && id.second.getValue().equalsIgnoreCase(INT_TYPE_NAME)) {
+        id.second = new Identifier(BuiltinType.AINT64.getTypeName());
      }
 
      return new TypeReferenceExpression(id);
@@ -1460,8 +1461,8 @@
         result.function  = third;
       }
 
-      if (result.function.equalsIgnoreCase("int")) {
-            result.function = "int64";
+      if (result.function.equalsIgnoreCase(INT_TYPE_NAME)) {
+        result.function = BuiltinType.AINT64.getTypeName();
       }
       return result;
     }
@@ -2086,8 +2087,6 @@
 
 VariableExpr VariableRef() throws ParseException:
 {
-    VariableExpr varExp = new VariableExpr();
-    VarIdentifier var = new VarIdentifier();
 }
 {
   <VARIABLE>
@@ -2097,13 +2096,13 @@
      if (isInForbiddenScopes(varName)) {
        throw new ParseException("Inside limit clauses, it is disallowed to reference a variable having the same name as any variable bound in the same scope as the limit clause.");
      }
+     VariableExpr varExp;
      if(ident != null) { // exist such ident
+       varExp = new VariableExpr((VarIdentifier)ident);
        varExp.setIsNewVar(false);
-       varExp.setVar((VarIdentifier)ident);
      } else {
-       varExp.setVar(var);
+       varExp = new VariableExpr(new VarIdentifier(varName));
      }
-     var.setValue(varName);
      return varExp;
     }
 }
@@ -2111,18 +2110,16 @@
 
 VariableExpr Variable() throws ParseException:
 {
-    VariableExpr varExp = new VariableExpr();
-    VarIdentifier var = new VarIdentifier();
 }
 {
   <VARIABLE>
     {
-     Identifier ident = lookupSymbol(token.image);
+     String varName = token.image;
+     Identifier ident = lookupSymbol(varName);
+     VariableExpr varExp = new VariableExpr(new VarIdentifier(varName));
      if(ident != null) { // exist such ident
        varExp.setIsNewVar(false);
      }
-     varExp.setVar(var);
-     var.setValue(token.image);
      return varExp;
     }
 }
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/expression/VariableExpr.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/expression/VariableExpr.java
index 17d0d2f..3ed3221 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/expression/VariableExpr.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/expression/VariableExpr.java
@@ -29,11 +29,6 @@
     private VarIdentifier var;
     private boolean isNewVar;
 
-    public VariableExpr() {
-        super();
-        isNewVar = true;
-    }
-
     public VariableExpr(VarIdentifier var) {
         super();
         this.var = var;
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
index 0a17b24..ac019fc 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
@@ -101,16 +101,12 @@
         }
     }
 
-    public Identifier getMetaName() {
-        return name;
-    }
-
     public Identifier getMetaItemTypeName() {
-        return metaItemTypeName == null ? new Identifier() : metaItemTypeName;
+        return metaItemTypeName;
     }
 
     public Identifier getMetaItemTypeDataverse() {
-        return metaItemTypeDataverse == null ? new Identifier() : metaItemTypeDataverse;
+        return metaItemTypeDataverse;
     }
 
     public String getQualifiedMetaTypeName() {
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/struct/Identifier.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/struct/Identifier.java
index 1443833..7f8171c 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/struct/Identifier.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/struct/Identifier.java
@@ -21,11 +21,8 @@
 import java.util.Objects;
 
 public class Identifier {
-    protected String value;
 
-    public Identifier() {
-        // default constructor.
-    }
+    protected final String value;
 
     public Identifier(String value) {
         this.value = value;
@@ -35,10 +32,6 @@
         return value;
     }
 
-    public final void setValue(String value) {
-        this.value = value;
-    }
-
     @Override
     public String toString() {
         return value;
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/struct/VarIdentifier.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/struct/VarIdentifier.java
index 587dd1c..4fea88b 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/struct/VarIdentifier.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/struct/VarIdentifier.java
@@ -21,10 +21,8 @@
 import java.util.Objects;
 
 public final class VarIdentifier extends Identifier {
-    private int id = 0;
 
-    public VarIdentifier() {
-    }
+    private int id;
 
     public VarIdentifier(VarIdentifier v) {
         this(v.getValue(), v.getId());
@@ -35,7 +33,7 @@
     }
 
     public VarIdentifier(String value, int id) {
-        this.value = value;
+        super(value);
         this.id = id;
     }
 
@@ -48,11 +46,6 @@
     }
 
     @Override
-    public VarIdentifier clone() {
-        return new VarIdentifier(value, id);
-    }
-
-    @Override
     public int hashCode() {
         return Objects.hash(value);
     }
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/ExpressionToVariableUtil.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/ExpressionToVariableUtil.java
index d8534be..21ae883 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/ExpressionToVariableUtil.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/ExpressionToVariableUtil.java
@@ -96,8 +96,7 @@
         try {
             String varName = getGeneratedIdentifier(expr);
             VarIdentifier var = new VarIdentifier(varName);
-            VariableExpr varExpr = new VariableExpr();
-            varExpr.setVar(var);
+            VariableExpr varExpr = new VariableExpr(var);
             varExpr.setSourceLocation(expr.getSourceLocation());
             return varExpr;
         } catch (ParseException e) {
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 60cdf2e..e0ad033 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -181,6 +181,7 @@
 import org.apache.asterix.lang.sqlpp.util.FunctionMapUtil;
 import org.apache.asterix.lang.sqlpp.util.SqlppVariableUtil;
 import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.types.BuiltinType;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -216,6 +217,8 @@
     private static final String TIES = "TIES";
     private static final String UNBOUNDED = "UNBOUNDED";
 
+    private static final String INT_TYPE_NAME = "int";
+
     // error configuration
     protected static final boolean REPORT_EXPECTED_TOKENS = false;
 
@@ -1644,8 +1647,8 @@
 {
   id = QualifiedName()
   {
-    if (id.first == null && id.second.getValue().equalsIgnoreCase("int")) {
-      id.second.setValue("int64");
+    if (id.first == null && id.second.getValue().equalsIgnoreCase(INT_TYPE_NAME)) {
+      id.second = new Identifier(BuiltinType.AINT64.getTypeName());
     }
 
     TypeReferenceExpression typeRef = new TypeReferenceExpression(id);
@@ -1727,8 +1730,8 @@
         result.function  = third;
       }
 
-      if (result.function.equalsIgnoreCase("int")) {
-            result.function = "int64";
+      if (result.function.equalsIgnoreCase(INT_TYPE_NAME)) {
+         result.function = BuiltinType.AINT64.getTypeName();
       }
       return result;
     }
@@ -2564,7 +2567,6 @@
 
 VariableExpr VariableRef() throws ParseException:
 {
-    VarIdentifier var = new VarIdentifier();
     String id = null;
 }
 {
@@ -2576,13 +2578,12 @@
        throw new SqlppParseException(getSourceLocation(token),
         "Inside limit clauses, it is disallowed to reference a variable having the same name as any variable bound in the same scope as the limit clause.");
      }
-     VariableExpr varExp = new VariableExpr();
+     VariableExpr varExp;
      if (ident != null) { // exist such ident
-       varExp.setVar((VarIdentifier)ident);
+       varExp = new VariableExpr((VarIdentifier)ident);
      } else {
-       varExp.setVar(var);
+       varExp = new VariableExpr(new VarIdentifier(id));
        varExp.setIsNewVar(false);
-       var.setValue(id);
      }
      return addSourceLocation(varExp, token);
     }
@@ -2590,7 +2591,6 @@
 
 VariableExpr Variable() throws ParseException:
 {
-    VarIdentifier var = new VarIdentifier();
     String id = null;
 }
 {
@@ -2598,12 +2598,10 @@
     {
      id = SqlppVariableUtil.toInternalVariableName(id); // prefix user-defined variables with "$".
      Identifier ident = lookupSymbol(id);
-     VariableExpr varExp = new VariableExpr();
-     if(ident != null) { // exist such ident
+     VariableExpr varExp = new VariableExpr(new VarIdentifier(id));
+     if (ident != null) { // exist such ident
        varExp.setIsNewVar(false);
      }
-     varExp.setVar(var);
-     var.setValue(id);
      return addSourceLocation(varExp, token);
     }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
index bb0081a..fe96d4f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
@@ -45,10 +45,11 @@
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
-import org.apache.hyracks.dataflow.common.data.partition.range.DynamicFieldRangePartitionComputerFactory;
+import org.apache.hyracks.dataflow.common.data.partition.range.DynamicRangeMapSupplier;
 import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
 import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
-import org.apache.hyracks.dataflow.common.data.partition.range.StaticFieldRangePartitionComputerFactory;
+import org.apache.hyracks.dataflow.common.data.partition.range.RangeMapSupplier;
+import org.apache.hyracks.dataflow.common.data.partition.range.StaticRangeMapSupplier;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 
 public class RangePartitionExchangePOperator extends AbstractExchangePOperator {
@@ -119,14 +120,10 @@
             comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
             i++;
         }
-        FieldRangePartitionComputerFactory partitionerFactory;
-        if (rangeMapIsComputedAtRunTime) {
-            partitionerFactory = new DynamicFieldRangePartitionComputerFactory(sortFields, comps, rangeMapKeyInContext,
-                    op.getSourceLocation());
-        } else {
-            partitionerFactory = new StaticFieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
-        }
-
+        RangeMapSupplier rangeMapSupplier = rangeMapIsComputedAtRunTime
+                ? new DynamicRangeMapSupplier(rangeMapKeyInContext) : new StaticRangeMapSupplier(rangeMap);
+        FieldRangePartitionComputerFactory partitionerFactory =
+                new FieldRangePartitionComputerFactory(sortFields, comps, rangeMapSupplier, op.getSourceLocation());
         IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, partitionerFactory);
         return new Pair<>(conn, null);
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
index 15a2d8f..ba4a6b7 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
@@ -52,8 +52,9 @@
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
 import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
-import org.apache.hyracks.dataflow.common.data.partition.range.StaticFieldRangePartitionComputerFactory;
+import org.apache.hyracks.dataflow.common.data.partition.range.StaticRangeMapSupplier;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
 
 public class RangePartitionMergeExchangePOperator extends AbstractExchangePOperator {
@@ -138,7 +139,8 @@
             comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
             i++;
         }
-        ITuplePartitionComputerFactory tpcf = new StaticFieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
+        ITuplePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, comps,
+                new StaticRangeMapSupplier(rangeMap), op.getSourceLocation());
         IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields, comps, nkcf);
         return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleMultiPartitionComputer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleMultiPartitionComputer.java
new file mode 100644
index 0000000..13c5ed8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleMultiPartitionComputer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.dataflow.value;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface ITupleMultiPartitionComputer {
+    /**
+     * For the tuple (located at tIndex in the frame), it determines which target partitions (0,1,... nParts-1) the
+     * tuple should be sent/written to.
+     * @param accessor The accessor of the frame to access tuples
+     * @param tIndex The index of the tuple in consideration
+     * @param nParts The number of target partitions
+     * @return The chosen target partitions as dictated by the logic of the partition computer
+     * @throws HyracksDataException
+     */
+    BitSet partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException;
+
+    /**
+     * Gives the data partitioner a chance to set up its environment before it starts partitioning tuples. This method
+     * should be called in the open() of {@link org.apache.hyracks.api.comm.IFrameWriter}. The default implementation
+     * is "do nothing".
+     * @throws HyracksDataException
+     */
+    default void initialize() throws HyracksDataException {
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleMultiPartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleMultiPartitionComputerFactory.java
new file mode 100644
index 0000000..c8821c3
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleMultiPartitionComputerFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public interface ITupleMultiPartitionComputerFactory extends Serializable {
+    ITupleMultiPartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicFieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicFieldRangePartitionComputerFactory.java
deleted file mode 100644
index bc642a9..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicFieldRangePartitionComputerFactory.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.common.data.partition.range;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.exceptions.ErrorCode;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.SourceLocation;
-import org.apache.hyracks.dataflow.common.utils.TaskUtil;
-
-public class DynamicFieldRangePartitionComputerFactory extends FieldRangePartitionComputerFactory {
-    private static final long serialVersionUID = 1L;
-    private final String rangeMapKeyInContext;
-    private final SourceLocation sourceLocation;
-
-    public DynamicFieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories,
-            String rangeMapKeyInContext, SourceLocation sourceLocation) {
-        super(rangeFields, comparatorFactories);
-        this.rangeMapKeyInContext = rangeMapKeyInContext;
-        this.sourceLocation = sourceLocation;
-    }
-
-    @Override
-    protected RangeMap getRangeMap(IHyracksTaskContext hyracksTaskContext) throws HyracksDataException {
-        RangeMap rangeMap = TaskUtil.get(rangeMapKeyInContext, hyracksTaskContext);
-        if (rangeMap == null) {
-            throw HyracksDataException.create(ErrorCode.RANGEMAP_NOT_FOUND, sourceLocation);
-        }
-        return rangeMap;
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicRangeMapSupplier.java
similarity index 64%
copy from hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
copy to hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicRangeMapSupplier.java
index b17c550..cfc4b82 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicRangeMapSupplier.java
@@ -16,23 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.hyracks.dataflow.common.data.partition.range;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 
-public class StaticFieldRangePartitionComputerFactory extends FieldRangePartitionComputerFactory {
+public final class DynamicRangeMapSupplier implements RangeMapSupplier {
+
     private static final long serialVersionUID = 1L;
-    private RangeMap rangeMap;
 
-    public StaticFieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories,
-            RangeMap rangeMap) {
-        super(rangeFields, comparatorFactories);
-        this.rangeMap = rangeMap;
+    private final String rangeMapKeyInContext;
+
+    public DynamicRangeMapSupplier(String rangeMapKeyInContext) {
+        this.rangeMapKeyInContext = rangeMapKeyInContext;
     }
 
-    @Override
-    protected RangeMap getRangeMap(IHyracksTaskContext hyracksTaskContext) {
-        return rangeMap;
+    public RangeMap getRangeMap(IHyracksTaskContext taskContext) {
+        return TaskUtil.get(rangeMapKeyInContext, taskContext);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
index 55d4420..1831a5f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
@@ -24,22 +24,27 @@
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
-public abstract class FieldRangePartitionComputerFactory implements ITuplePartitionComputerFactory {
+public final class FieldRangePartitionComputerFactory implements ITuplePartitionComputerFactory {
     private static final long serialVersionUID = 1L;
     private final int[] rangeFields;
-    private IBinaryComparatorFactory[] comparatorFactories;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+    private final RangeMapSupplier rangeMapSupplier;
+    private final SourceLocation sourceLocation;
 
-    protected FieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories) {
+    public FieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories,
+            RangeMapSupplier rangeMapSupplier, SourceLocation sourceLocation) {
         this.rangeFields = rangeFields;
+        this.rangeMapSupplier = rangeMapSupplier;
         this.comparatorFactories = comparatorFactories;
+        this.sourceLocation = sourceLocation;
     }
 
-    protected abstract RangeMap getRangeMap(IHyracksTaskContext hyracksTaskContext) throws HyracksDataException;
-
     @Override
-    public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext) {
+    public ITuplePartitionComputer createPartitioner(IHyracksTaskContext taskContext) {
         final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -50,7 +55,10 @@
 
             @Override
             public void initialize() throws HyracksDataException {
-                rangeMap = getRangeMap(hyracksTaskContext);
+                rangeMap = rangeMapSupplier.getRangeMap(taskContext);
+                if (rangeMap == null) {
+                    throw HyracksDataException.create(ErrorCode.RANGEMAP_NOT_FOUND, sourceLocation);
+                }
             }
 
             @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMapSupplier.java
similarity index 60%
copy from hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
copy to hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMapSupplier.java
index b17c550..fc0911e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMapSupplier.java
@@ -16,23 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.hyracks.dataflow.common.data.partition.range;
 
+import java.io.Serializable;
+
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 
-public class StaticFieldRangePartitionComputerFactory extends FieldRangePartitionComputerFactory {
-    private static final long serialVersionUID = 1L;
-    private RangeMap rangeMap;
-
-    public StaticFieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories,
-            RangeMap rangeMap) {
-        super(rangeFields, comparatorFactories);
-        this.rangeMap = rangeMap;
-    }
-
-    @Override
-    protected RangeMap getRangeMap(IHyracksTaskContext hyracksTaskContext) {
-        return rangeMap;
-    }
+public interface RangeMapSupplier extends Serializable {
+    RangeMap getRangeMap(IHyracksTaskContext taskContext);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticRangeMapSupplier.java
similarity index 69%
rename from hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
rename to hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticRangeMapSupplier.java
index b17c550..613ccc5 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticRangeMapSupplier.java
@@ -16,23 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.hyracks.dataflow.common.data.partition.range;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 
-public class StaticFieldRangePartitionComputerFactory extends FieldRangePartitionComputerFactory {
+public final class StaticRangeMapSupplier implements RangeMapSupplier {
+
     private static final long serialVersionUID = 1L;
-    private RangeMap rangeMap;
 
-    public StaticFieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories,
-            RangeMap rangeMap) {
-        super(rangeFields, comparatorFactories);
+    private final RangeMap rangeMap;
+
+    public StaticRangeMapSupplier(RangeMap rangeMap) {
         this.rangeMap = rangeMap;
     }
 
     @Override
-    protected RangeMap getRangeMap(IHyracksTaskContext hyracksTaskContext) {
+    public RangeMap getRangeMap(IHyracksTaskContext taskContext) {
         return rangeMap;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/AbstractPartitionDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/AbstractPartitionDataWriter.java
new file mode 100644
index 0000000..03f260a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/AbstractPartitionDataWriter.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.connectors;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.util.trace.ITracer;
+
+abstract class AbstractPartitionDataWriter implements IFrameWriter {
+
+    protected final int consumerPartitionCount;
+    private final IFrameWriter[] pWriters;
+    private final boolean[] isOpen;
+    private final FrameTupleAppender[] appenders;
+    protected final FrameTupleAccessor tupleAccessor;
+    protected final IHyracksTaskContext ctx;
+    private boolean[] allocatedFrames;
+    private boolean failed = false;
+
+    public AbstractPartitionDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount,
+            IPartitionWriterFactory pwFactory, RecordDescriptor recordDescriptor) throws HyracksDataException {
+        this.ctx = ctx;
+        this.consumerPartitionCount = consumerPartitionCount;
+        pWriters = new IFrameWriter[consumerPartitionCount];
+        isOpen = new boolean[consumerPartitionCount];
+        allocatedFrames = new boolean[consumerPartitionCount];
+        appenders = new FrameTupleAppender[consumerPartitionCount];
+        tupleAccessor = new FrameTupleAccessor(recordDescriptor);
+        initializeAppenders(pwFactory);
+    }
+
+    protected void initializeAppenders(IPartitionWriterFactory pwFactory) throws HyracksDataException {
+        for (int i = 0; i < consumerPartitionCount; ++i) {
+            try {
+                pWriters[i] = pwFactory.createFrameWriter(i);
+                appenders[i] = createTupleAppender(ctx);
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+    }
+
+    protected FrameTupleAppender createTupleAppender(IHyracksTaskContext ctx) {
+        return new FrameTupleAppender();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        HyracksDataException closeException = null;
+        if (!failed) {
+            boolean newFailure = false;
+            for (int i = 0; i < pWriters.length; ++i) {
+                try {
+                    if (isOpen[i] && allocatedFrames[i] && appenders[i].getTupleCount() > 0) {
+                        appenders[i].write(pWriters[i], true);
+                    }
+                } catch (Exception e) {
+                    newFailure = true;
+                    closeException = wrapException(closeException, e);
+                    break;
+                }
+            }
+            if (newFailure) {
+                try {
+                    fail(); // Fail all writers if any new failure happens.
+                } catch (Exception e) {
+                    closeException = wrapException(closeException, e);
+                }
+            }
+        }
+        for (int i = 0; i < pWriters.length; ++i) {
+            if (isOpen[i]) {
+                // The try-block make sures that every writer is closed.
+                try {
+                    pWriters[i].close();
+                } catch (Exception e) {
+                    closeException = wrapException(closeException, e);
+                }
+            }
+        }
+        if (closeException != null) {
+            throw closeException;
+        }
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        for (int i = 0; i < pWriters.length; ++i) {
+            isOpen[i] = true;
+            pWriters[i].open();
+        }
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        tupleAccessor.reset(buffer);
+        int tupleCount = tupleAccessor.getTupleCount();
+        for (int i = 0; i < tupleCount; ++i) {
+            processTuple(i);
+        }
+    }
+
+    protected abstract void processTuple(int tupleIndex) throws HyracksDataException;
+
+    protected void appendToPartitionWriter(int tupleIndex, int partition) throws HyracksDataException {
+        if (!allocatedFrames[partition]) {
+            allocateFrames(partition);
+        }
+        FrameUtils.appendToWriter(pWriters[partition], appenders[partition], tupleAccessor, tupleIndex);
+    }
+
+    protected void allocateFrames(int i) throws HyracksDataException {
+        appenders[i].reset(new VSizeFrame(ctx), true);
+        allocatedFrames[i] = true;
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        failed = true;
+        HyracksDataException failException = null;
+        for (int i = 0; i < appenders.length; ++i) {
+            if (isOpen[i]) {
+                try {
+                    pWriters[i].fail();
+                } catch (Exception e) {
+                    failException = wrapException(failException, e);
+                }
+            }
+        }
+        if (failException != null) {
+            throw failException;
+        }
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        for (int i = 0; i < consumerPartitionCount; i++) {
+            if (allocatedFrames[i]) {
+                appenders[i].flush(pWriters[i]);
+            }
+        }
+    }
+
+    public void flush(ITracer tracer, String name, long cat, String args) throws HyracksDataException {
+        for (int i = 0; i < consumerPartitionCount; i++) {
+            if (allocatedFrames[i]) {
+                appenders[i].flush(pWriters[i], tracer, name, cat, args);
+            }
+        }
+    }
+
+    // Wraps the current encountered exception into the final exception.
+    private HyracksDataException wrapException(HyracksDataException finalException, Exception currentException) {
+        if (finalException == null) {
+            return HyracksDataException.create(currentException);
+        }
+        finalException.addSuppressed(currentException);
+        return finalException;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartialBroadcastConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartialBroadcastConnectorDescriptor.java
new file mode 100644
index 0000000..5d79f8c
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartialBroadcastConnectorDescriptor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.connectors;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITupleMultiPartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
+
+public class MToNPartialBroadcastConnectorDescriptor extends AbstractMToNConnectorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    protected ITupleMultiPartitionComputerFactory tpcf;
+
+    public MToNPartialBroadcastConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ITupleMultiPartitionComputerFactory tpcf) {
+        super(spec);
+        this.tpcf = tpcf;
+    }
+
+    @Override
+    public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+            IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+            throws HyracksDataException {
+        return new MultiPartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc,
+                tpcf.createPartitioner(ctx));
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MultiPartitionDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MultiPartitionDataWriter.java
new file mode 100644
index 0000000..aed39df
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MultiPartitionDataWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.connectors;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITupleMultiPartitionComputer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class MultiPartitionDataWriter extends AbstractPartitionDataWriter {
+
+    private final ITupleMultiPartitionComputer tpc;
+
+    public MultiPartitionDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount,
+            IPartitionWriterFactory pwFactory, RecordDescriptor recordDescriptor, ITupleMultiPartitionComputer tpc)
+            throws HyracksDataException {
+        super(ctx, consumerPartitionCount, pwFactory, recordDescriptor);
+        this.tpc = tpc;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        super.open();
+        tpc.initialize();
+    }
+
+    @Override
+    protected void processTuple(int tupleIndex) throws HyracksDataException {
+        BitSet partitionSet = tpc.partition(tupleAccessor, tupleIndex, consumerPartitionCount);
+        for (int p = partitionSet.nextSetBit(0); p >= 0; p = partitionSet.nextSetBit(p + 1)) {
+            appendToPartitionWriter(tupleIndex, p);
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index d06d5d3..e67f9a8 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -18,167 +18,31 @@
  */
 package org.apache.hyracks.dataflow.std.connectors;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.IPartitionWriterFactory;
-import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.util.trace.ITracer;
 
-public class PartitionDataWriter implements IFrameWriter {
-    private final int consumerPartitionCount;
-    private final IFrameWriter[] pWriters;
-    private final boolean[] isOpen;
-    private final FrameTupleAppender[] appenders;
-    private final FrameTupleAccessor tupleAccessor;
+public class PartitionDataWriter extends AbstractPartitionDataWriter {
+
     private final ITuplePartitionComputer tpc;
-    private final IHyracksTaskContext ctx;
-    private boolean[] allocatedFrames;
-    private boolean failed = false;
 
     public PartitionDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount, IPartitionWriterFactory pwFactory,
             RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) throws HyracksDataException {
-        this.ctx = ctx;
+        super(ctx, consumerPartitionCount, pwFactory, recordDescriptor);
         this.tpc = tpc;
-        this.consumerPartitionCount = consumerPartitionCount;
-        pWriters = new IFrameWriter[consumerPartitionCount];
-        isOpen = new boolean[consumerPartitionCount];
-        allocatedFrames = new boolean[consumerPartitionCount];
-        appenders = new FrameTupleAppender[consumerPartitionCount];
-        tupleAccessor = new FrameTupleAccessor(recordDescriptor);
-        initializeAppenders(pwFactory);
-    }
-
-    protected void initializeAppenders(IPartitionWriterFactory pwFactory) throws HyracksDataException {
-        for (int i = 0; i < consumerPartitionCount; ++i) {
-            try {
-                pWriters[i] = pwFactory.createFrameWriter(i);
-                appenders[i] = createTupleAppender(ctx);
-            } catch (IOException e) {
-                throw HyracksDataException.create(e);
-            }
-        }
-    }
-
-    protected FrameTupleAppender createTupleAppender(IHyracksTaskContext ctx) {
-        return new FrameTupleAppender();
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        HyracksDataException closeException = null;
-        if (!failed) {
-            boolean newFailure = false;
-            for (int i = 0; i < pWriters.length; ++i) {
-                try {
-                    if (isOpen[i] && allocatedFrames[i] && appenders[i].getTupleCount() > 0) {
-                        appenders[i].write(pWriters[i], true);
-                    }
-                } catch (Exception e) {
-                    newFailure = true;
-                    closeException = wrapException(closeException, e);
-                    break;
-                }
-            }
-            if (newFailure) {
-                try {
-                    fail(); // Fail all writers if any new failure happens.
-                } catch (Exception e) {
-                    closeException = wrapException(closeException, e);
-                }
-            }
-        }
-        for (int i = 0; i < pWriters.length; ++i) {
-            if (isOpen[i]) {
-                // The try-block make sures that every writer is closed.
-                try {
-                    pWriters[i].close();
-                } catch (Exception e) {
-                    closeException = wrapException(closeException, e);
-                }
-            }
-        }
-        if (closeException != null) {
-            throw closeException;
-        }
     }
 
     @Override
     public void open() throws HyracksDataException {
+        super.open();
         tpc.initialize();
-        for (int i = 0; i < pWriters.length; ++i) {
-            isOpen[i] = true;
-            pWriters[i].open();
-        }
     }
 
     @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        tupleAccessor.reset(buffer);
-        int tupleCount = tupleAccessor.getTupleCount();
-        for (int i = 0; i < tupleCount; ++i) {
-            int h = tpc.partition(tupleAccessor, i, consumerPartitionCount);
-            if (!allocatedFrames[h]) {
-                allocateFrames(h);
-            }
-            FrameUtils.appendToWriter(pWriters[h], appenders[h], tupleAccessor, i);
-        }
-    }
-
-    protected void allocateFrames(int i) throws HyracksDataException {
-        appenders[i].reset(new VSizeFrame(ctx), true);
-        allocatedFrames[i] = true;
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        failed = true;
-        HyracksDataException failException = null;
-        for (int i = 0; i < appenders.length; ++i) {
-            if (isOpen[i]) {
-                try {
-                    pWriters[i].fail();
-                } catch (Exception e) {
-                    failException = wrapException(failException, e);
-                }
-            }
-        }
-        if (failException != null) {
-            throw failException;
-        }
-    }
-
-    @Override
-    public void flush() throws HyracksDataException {
-        for (int i = 0; i < consumerPartitionCount; i++) {
-            if (allocatedFrames[i]) {
-                appenders[i].flush(pWriters[i]);
-            }
-        }
-    }
-
-    public void flush(ITracer tracer, String name, long cat, String args) throws HyracksDataException {
-        for (int i = 0; i < consumerPartitionCount; i++) {
-            if (allocatedFrames[i]) {
-                appenders[i].flush(pWriters[i], tracer, name, cat, args);
-            }
-        }
-    }
-
-    // Wraps the current encountered exception into the final exception.
-    private HyracksDataException wrapException(HyracksDataException finalException, Exception currentException) {
-        if (finalException == null) {
-            return HyracksDataException.create(currentException);
-        }
-        finalException.addSuppressed(currentException);
-        return finalException;
+    protected void processTuple(int tupleIndex) throws HyracksDataException {
+        int p = tpc.partition(tupleAccessor, tupleIndex, consumerPartitionCount);
+        appendToPartitionWriter(tupleIndex, p);
     }
 }