Merge branch 'master' into results
diff --git a/asterix-bad/pom.xml b/asterix-bad/pom.xml
index b7d775a..cc8204e 100644
--- a/asterix-bad/pom.xml
+++ b/asterix-bad/pom.xml
@@ -276,6 +276,16 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-util</artifactId>
+      <version>${hyracks.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>algebricks-data</artifactId>
+      <version>${hyracks.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-control-common</artifactId>
       <version>${hyracks.version}</version>
     </dependency>
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
index 69145d9..c48ec54 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
@@ -108,9 +108,8 @@
     public static boolean runDeployedJobSpecCheckPeriod(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
             Map<byte[], byte[]> jobParameters, long period, EntityId entityId, ITxnIdFactory txnIdFactory,
             DeployedJobSpecEventListener listener) throws Exception {
-        long executionMilliseconds =
-                runDeployedJobSpec(distributedId, hcc, null, jobParameters, entityId, txnIdFactory, null, listener,
-                        null);
+        long executionMilliseconds = runDeployedJobSpec(distributedId, hcc, null, jobParameters, entityId, txnIdFactory,
+                null, listener, null);
         if (executionMilliseconds > period) {
             LOGGER.log(Level.SEVERE,
                     "Periodic job for " + entityId.getExtensionName() + " " + entityId.getDataverse() + "."
@@ -153,7 +152,6 @@
 
     }
 
-
     public static long findPeriod(String duration) {
         //TODO: Allow Repetitive Channels to use YMD durations
         String hoursMinutesSeconds = "";
@@ -189,7 +187,8 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         JobSpecification jobSpec = null;
         try {
-            jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null);
+            jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null, null,
+                    null);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
         } catch (Exception e) {
@@ -230,7 +229,7 @@
                 jobSpec = compilePushChannel(badStatementExecutor, metadataProvider, hcc, (Query) fStatements.get(1));
             } else {
                 jobSpec = badStatementExecutor.handleInsertUpsertStatement(metadataProvider, fStatements.get(1), hcc,
-                        null, null, null, null, true, null);
+                        null, null, null, null, true, null, null, null);
             }
         } else {
             //Procedures
@@ -263,7 +262,7 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         JobSpecification jobSpec;
         try {
-            jobSpec = statementExecutor.rewriteCompileQuery(hcc, metadataProvider, q, null);
+            jobSpec = statementExecutor.rewriteCompileQuery(hcc, metadataProvider, q, null, null, null);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
             ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
@@ -277,14 +276,15 @@
             IStatementExecutor.Stats stats, Statement procedureStatement) throws Exception {
         if (procedureStatement.getKind() == Statement.Kind.INSERT) {
             return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
-                    procedureStatement, hcc, hdc, IStatementExecutor.ResultDelivery.ASYNC, null, stats, true, null);
+                    procedureStatement, hcc, hdc, IStatementExecutor.ResultDelivery.ASYNC, null, stats, true, null,
+                    null, null);
         } else if (procedureStatement.getKind() == Statement.Kind.QUERY) {
             return compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) procedureStatement);
         } else {
             SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
             procedureStatement.accept(visitor, null);
             return ((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, procedureStatement,
-                    hcc, true);
+                    hcc, true, null, null);
         }
     }
 
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
index 2a3d4fb..99f0d66 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
@@ -18,17 +18,11 @@
  */
 package org.apache.asterix.bad.lang;
 
-import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslatorFactory;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.compiler.provider.IRuleSetFactory;
-import org.apache.asterix.lang.common.base.IAstPrintVisitorFactory;
+import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
 import org.apache.asterix.lang.common.base.IParserFactory;
-import org.apache.asterix.lang.common.base.IRewriterFactory;
-import org.apache.asterix.lang.sqlpp.rewrites.SqlppRewriterFactory;
-import org.apache.asterix.lang.sqlpp.visitor.SqlppAstPrintVisitorFactory;
-import org.apache.asterix.translator.SqlppExpressionToPlanTranslatorFactory;
 
-public class BADCompilationProvider implements ILangCompilationProvider {
+public class BADCompilationProvider extends SqlppCompilationProvider {
 
     @Override
     public IParserFactory getParserFactory() {
@@ -36,21 +30,6 @@
     }
 
     @Override
-    public IRewriterFactory getRewriterFactory() {
-        return new SqlppRewriterFactory();
-    }
-
-    @Override
-    public IAstPrintVisitorFactory getAstPrintVisitorFactory() {
-        return new SqlppAstPrintVisitorFactory();
-    }
-
-    @Override
-    public ILangExpressionToPlanTranslatorFactory getExpressionToPlanTranslatorFactory() {
-        return new SqlppExpressionToPlanTranslatorFactory();
-    }
-
-    @Override
     public IRuleSetFactory getRuleSetFactory() {
         return new BADRuleSetFactory();
     }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index 2f23a9c..3ea2c0d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -334,7 +334,7 @@
                 }
             }
         }
-        final IRequestParameters requestParameters = new RequestParameters(null, null, null, null, null, null);
+        final IRequestParameters requestParameters = new RequestParameters(null, null, null, null, null, null, null);
         for (Channel channel : channels) {
             if (!channel.getChannelId().getDataverse().equals(dvId.getValue())) {
                 continue;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index cb2498c..5803f60 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -208,13 +208,13 @@
                 InsertStatement insert = new InsertStatement(new Identifier(dataverse),
                         new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, resultVar, accessor);
                 ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, insert, hcc, hdc,
-                        resultDelivery, null, stats, false, null);
+                        resultDelivery, null, stats, false, null, null, null);
             } else {
                 //To update an existing subscription
                 UpsertStatement upsert = new UpsertStatement(new Identifier(dataverse),
                         new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, null, null);
                 ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, upsert, hcc, hdc,
-                        resultDelivery, null, stats, false, null);
+                        resultDelivery, null, stats, false, null, null, null);
             }
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index 03a3f1d..e6ba69c 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -139,7 +139,8 @@
             MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
                     metadataProvider.getDefaultDataverse());
             tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
-            ((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false);
+            ((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false, null,
+                    null);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
             QueryTranslator.abort(e, e, mdTxnCtx);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 1551a7c..b2ffa1b 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -277,7 +277,7 @@
                     (Query) fStatements.get(1));
         }
         return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1),
-                hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null);
+                hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null, null);
     }
 
     @Override
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index be5bedb..aaf2bfa 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -176,7 +176,7 @@
 
     private Pair<JobSpecification, PrecompiledType> createProcedureJob(IStatementExecutor statementExecutor,
             MetadataProvider metadataProvider, IHyracksClientConnection hcc, Stats stats)
-                    throws Exception {
+            throws Exception {
         if (getProcedureBodyStatement().getKind() == Statement.Kind.INSERT) {
             if (!varList.isEmpty()) {
                 throw new CompilationException("Insert procedures cannot have parameters");
@@ -185,9 +185,8 @@
             dependencies.get(0).add(Arrays.asList(
                     ((QueryTranslator) statementExecutor).getActiveDataverse(insertStatement.getDataverseName()),
                     insertStatement.getDatasetName().getValue()));
-            return new Pair<>(
-                    ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
-                            getProcedureBodyStatement(), hcc, null, ResultDelivery.ASYNC, null, stats, true, null),
+            return new Pair<>(((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
+                    getProcedureBodyStatement(), hcc, null, ResultDelivery.ASYNC, null, stats, true, null, null, null),
                     PrecompiledType.INSERT);
         } else if (getProcedureBodyStatement().getKind() == Statement.Kind.QUERY) {
             SqlppRewriterFactory fact = new SqlppRewriterFactory();
@@ -209,7 +208,7 @@
                     metadataProvider);
             Pair<JobSpecification, PrecompiledType> pair =
                     new Pair<>(((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider,
-                    getProcedureBodyStatement(), hcc, true), PrecompiledType.DELETE);
+                            getProcedureBodyStatement(), hcc, true, null, null), PrecompiledType.DELETE);
             return pair;
         } else {
             throw new CompilationException("Procedure can only execute a single delete, insert, or query");
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
index d34d170..89d940e 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
@@ -122,7 +122,7 @@
             listener.suspend();
             activeEventHandler.registerListener(listener);
             BADJobService.redeployJobSpec(entityId, channel.getChannelBody(), metadataProvider, badStatementExecutor,
-                    hcc, new RequestParameters(null, null, null, null, null, null), true);
+                    hcc, new RequestParameters(null, null, null, null, null, null, null), true);
 
             ScheduledExecutorService ses = BADJobService.startRepetitiveDeployedJobSpec(listener.getDeployedJobSpecId(),
                     hcc,
@@ -149,7 +149,7 @@
                             new HyracksDataset(hcc, appCtx.getCompilerProperties().getFrameSize(),
                                     ResultReader.NUM_READERS),
                             new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
-                            new IStatementExecutor.Stats(), null, null, null),
+                            new IStatementExecutor.Stats(), null, null, null, null),
                     true);
             metadataProvider.getLocks().unlock();
             //Log that the procedure stopped by cluster restart. Procedure is available again now.
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
index 6ffb244..8e07af2 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -19,34 +19,33 @@
 
 package org.apache.asterix.bad.runtime;
 
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.AOrderedlistPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ARecordPrinterFactory;
 import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
 import org.apache.asterix.om.base.ADateTime;
-import org.apache.asterix.om.base.AOrderedList;
-import org.apache.asterix.om.base.ARecord;
-import org.apache.asterix.om.base.AUUID;
 import org.apache.asterix.om.types.AOrderedListType;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.data.IPrinter;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
@@ -58,15 +57,19 @@
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.util.string.UTF8StringReader;
+import org.apache.hyracks.util.string.UTF8StringWriter;
 
 public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
     private static final Logger LOGGER = Logger.getLogger(NotifyBrokerRuntime.class.getName());
 
     private final ByteBufferInputStream bbis = new ByteBufferInputStream();
     private final DataInputStream di = new DataInputStream(bbis);
-    private final AOrderedListSerializerDeserializer subSerDes =
-            new AOrderedListSerializerDeserializer(new AOrderedListType(BuiltinType.AUUID, null));
-    private final ARecordSerializerDeserializer recordSerDes;
+    private static final AStringSerializerDeserializer stringSerDes =
+            new AStringSerializerDeserializer(new UTF8StringWriter(), new UTF8StringReader());
+
+    private final IPrinter recordPrinterFactory;
+    private final IPrinter subscriptionIdListPrinterFactory;
 
     private IPointable inputArg0 = new VoidPointable();
     private IPointable inputArg1 = new VoidPointable();
@@ -74,14 +77,14 @@
     private IScalarEvaluator eval0;
     private IScalarEvaluator eval1;
     private IScalarEvaluator eval2;
-    private final ActiveManager activeManager;
     private final EntityId entityId;
     private final boolean push;
-    private AOrderedList pushList;
-    private ARecord pushRecord;
-    private final IAType recordType;
-    private final Map<String, HashSet<String>> sendData = new HashMap<>();
+    private final Map<String, String> sendData = new HashMap<>();
+    private final Map<String, ByteArrayOutputStream> sendbaos = new HashMap<>();
+    private final Map<String, PrintStream> sendStreams = new HashMap<>();
     private String executionTimeString;
+    private boolean firstResult = true;
+    String endpoint;
 
     public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brokerEvalFactory,
             IScalarEvaluatorFactory pushListEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
@@ -90,14 +93,11 @@
         eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
         eval1 = pushListEvalFactory.createScalarEvaluator(ctx);
         eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
-        this.activeManager = (ActiveManager) ((INcApplicationContext) ctx.getJobletContext().getServiceContext()
-                .getApplicationContext()).getActiveManager();
         this.entityId = activeJobId;
         this.push = push;
-        this.pushList = null;
-        this.pushRecord = null;
-        this.recordType = recordType;
-        recordSerDes = new ARecordSerializerDeserializer((ARecordType) recordType);
+        recordPrinterFactory = new ARecordPrinterFactory((ARecordType) recordType).createPrinter();
+        subscriptionIdListPrinterFactory =
+                new AOrderedlistPrinterFactory(new AOrderedListType(BuiltinType.AUUID, null)).createPrinter();
         executionTimeString = null;
     }
 
@@ -106,28 +106,18 @@
         return;
     }
 
-    private void addSubscriptions(String endpoint, AOrderedList subscriptionIds) {
-        for (int i = 0; i < subscriptionIds.size(); i++) {
-            AUUID subId = (AUUID) subscriptionIds.getItem(i);
-            String subscriptionString = subId.toString();
-            //Broker code currently cannot handle the "uuid {}" part of the string, so we parse just the value
-            subscriptionString = subscriptionString.substring(8, subscriptionString.length() - 2);
-            subscriptionString = "\"" + subscriptionString + "\"";
-            sendData.get(endpoint).add(subscriptionString);
-        }
-    }
-
     public String createData(String endpoint) {
-        String JSON = "{ \"dataverseName\":\"" + entityId.getDataverse() + "\", \"channelName\":\""
-                + entityId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\""
-                + executionTimeString + "\", \"subscriptionIds\":[";
-        for (String value : sendData.get(endpoint)) {
-            JSON += value;
-            JSON += ",";
+        String resultTitle = "\"subscriptionIds";
+        if (push) {
+            resultTitle = "\"results\"";
         }
-        JSON = JSON.substring(0, JSON.length() - 1);
-        JSON += "]}";
-        return JSON;
+        String jsonStr = "{ \"dataverseName\":\"" + entityId.getDataverse() + "\", \"channelName\":\""
+                + entityId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\""
+                + executionTimeString + "\", " + resultTitle + ":[";
+        jsonStr += sendData.get(endpoint);
+        jsonStr = jsonStr.substring(0, jsonStr.length());
+        jsonStr += "]}";
+        return jsonStr;
 
     }
 
@@ -172,6 +162,11 @@
             eval1.evaluate(tRef, inputArg1);
             eval2.evaluate(tRef, inputArg2);
 
+            /*The incoming tuples have three fields:
+             1. eval0 will get the serialized broker endpoint string
+             2. eval1 will get the payload (either the subscriptionIds or entire results)
+             3. eval2 will get the channel execution time stamp (the same for all tuples)
+            */
             if (executionTimeString == null) {
                 int resultSetOffset = inputArg2.getStartOffset();
                 bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1);
@@ -185,34 +180,51 @@
 
             int serBrokerOffset = inputArg0.getStartOffset();
             bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1);
-            String endpoint = AStringSerializerDeserializer.INSTANCE.deserialize(di).getStringValue();
-            sendData.putIfAbsent(endpoint, new HashSet<>());
+            endpoint = stringSerDes.deserialize(di).getStringValue();
+            sendbaos.putIfAbsent(endpoint, new ByteArrayOutputStream());
+            try {
+                sendStreams.putIfAbsent(endpoint,
+                        new PrintStream(sendbaos.get(endpoint), true, StandardCharsets.UTF_8.name()));
+            } catch (UnsupportedEncodingException e) {
+                throw new HyracksDataException(e.getMessage());
+            }
 
             if (push) {
                 int pushOffset = inputArg1.getStartOffset();
                 bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), pushOffset + 1);
-                //TODO: Right now this creates an object per channel result. Need to find a better way to deserialize
-                pushRecord = recordSerDes.deserialize(di);
-                sendData.get(endpoint).add(pushRecord.toString());
+                if (!firstResult) {
+                    sendStreams.get(endpoint).append(',');
+                }
+                recordPrinterFactory.print(inputArg1.getByteArray(), inputArg1.getStartOffset(), inputArg1.getLength(),
+                        sendStreams.get(endpoint));
 
             } else {
-                int serSubOffset = inputArg1.getStartOffset();
-                bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serSubOffset + 1);
-                pushList = subSerDes.deserialize(di);
-                addSubscriptions(endpoint, pushList);
+                if (!firstResult) {
+                    sendStreams.get(endpoint).append(',');
+                }
+                subscriptionIdListPrinterFactory.print(inputArg1.getByteArray(), inputArg1.getStartOffset(),
+                        inputArg1.getLength(),
+                        sendStreams.get(endpoint));
             }
+            firstResult = false;
         }
 
     }
 
     @Override
     public void close() throws HyracksDataException {
-        for (String endpoint : sendData.keySet()) {
-            if (sendData.get(endpoint).size() > 0) {
-                sendGroupOfResults(endpoint);
-                sendData.get(endpoint).clear();
+        for (String endpoint : sendStreams.keySet()) {
+            sendData.put(endpoint, new String(sendbaos.get(endpoint).toByteArray(), StandardCharsets.UTF_8));
+            sendGroupOfResults(endpoint);
+            sendStreams.get(endpoint).close();
+            try {
+                sendbaos.get(endpoint).close();
+            } catch (IOException e) {
+                throw new HyracksDataException(e.getMessage());
             }
+
         }
+
         return;
     }
 
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced-index-only.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced-index-only.plan
index d0b3087..308c8b1 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced-index-only.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced-index-only.plan
@@ -1,17 +1,17 @@
 -- NOTIFY_BROKERS  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-    -- PRE_CLUSTERED_GROUP_BY[$$134, $$channelExecutionTime]  |PARTITIONED|
+    -- PRE_CLUSTERED_GROUP_BY[$$141, $$channelExecutionTime]  |PARTITIONED|
             {
               -- AGGREGATE  |LOCAL|
                 -- NESTED_TUPLE_SOURCE  |LOCAL|
             }
       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-        -- STABLE_SORT [$$134(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
-          -- HASH_PARTITION_EXCHANGE [$$134, $$channelExecutionTime]  |PARTITIONED|
+        -- STABLE_SORT [$$141(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
+          -- HASH_PARTITION_EXCHANGE [$$141, $$channelExecutionTime]  |PARTITIONED|
             -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- STABLE_SORT [$$109(ASC)]  |PARTITIONED|
-                  -- HASH_PARTITION_EXCHANGE [$$109]  |PARTITIONED|
+                -- STABLE_SORT [$$116(ASC)]  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$116]  |PARTITIONED|
                     -- STREAM_PROJECT  |PARTITIONED|
                       -- COMMIT  |PARTITIONED|
                         -- STREAM_PROJECT  |PARTITIONED|
@@ -21,7 +21,7 @@
                                 -- STREAM_PROJECT  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                     -- INSERT_DELETE  |PARTITIONED|
-                                      -- HASH_PARTITION_EXCHANGE [$$104]  |PARTITIONED|
+                                      -- HASH_PARTITION_EXCHANGE [$$111]  |PARTITIONED|
                                         -- ASSIGN  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ASSIGN  |PARTITIONED|
@@ -33,12 +33,12 @@
                                                         -- STREAM_SELECT  |PARTITIONED|
                                                           -- STREAM_PROJECT  |PARTITIONED|
                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- HYBRID_HASH_JOIN [$$119][$$118]  |PARTITIONED|
-                                                                -- HASH_PARTITION_EXCHANGE [$$119]  |PARTITIONED|
+                                                              -- HYBRID_HASH_JOIN [$$126][$$125]  |PARTITIONED|
+                                                                -- HASH_PARTITION_EXCHANGE [$$126]  |PARTITIONED|
                                                                   -- STREAM_PROJECT  |PARTITIONED|
                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- HYBRID_HASH_JOIN [$$117, $$115][$$110, $$111]  |PARTITIONED|
-                                                                        -- HASH_PARTITION_EXCHANGE [$$117, $$115]  |PARTITIONED|
+                                                                      -- HYBRID_HASH_JOIN [$$124, $$122][$$117, $$118]  |PARTITIONED|
+                                                                        -- HASH_PARTITION_EXCHANGE [$$124, $$122]  |PARTITIONED|
                                                                           -- STREAM_PROJECT  |PARTITIONED|
                                                                             -- ASSIGN  |PARTITIONED|
                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -46,24 +46,24 @@
                                                                                   -- BROADCAST_EXCHANGE  |PARTITIONED|
                                                                                     -- ASSIGN  |UNPARTITIONED|
                                                                                       -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                                                        -- HASH_PARTITION_EXCHANGE [$$110, $$111]  |PARTITIONED|
+                                                                        -- HASH_PARTITION_EXCHANGE [$$117, $$118]  |PARTITIONED|
                                                                           -- STREAM_PROJECT  |PARTITIONED|
                                                                             -- ASSIGN  |PARTITIONED|
                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                 -- DATASOURCE_SCAN  |PARTITIONED|
                                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                     -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                -- HASH_PARTITION_EXCHANGE [$$118]  |PARTITIONED|
+                                                                -- HASH_PARTITION_EXCHANGE [$$125]  |PARTITIONED|
                                                                   -- STREAM_PROJECT  |PARTITIONED|
                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- HYBRID_HASH_JOIN [$$112][$$128]  |PARTITIONED|
+                                                                      -- HYBRID_HASH_JOIN [$$119][$$135]  |PARTITIONED|
                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                           -- STREAM_SELECT  |PARTITIONED|
                                                                             -- ASSIGN  |PARTITIONED|
                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                 -- BTREE_SEARCH  |PARTITIONED|
                                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                    -- STABLE_SORT [$$138(ASC)]  |PARTITIONED|
+                                                                                    -- STABLE_SORT [$$145(ASC)]  |PARTITIONED|
                                                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                         -- STREAM_PROJECT  |PARTITIONED|
                                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -71,7 +71,7 @@
                                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                                 -- ASSIGN  |PARTITIONED|
                                                                                                   -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                        -- HASH_PARTITION_EXCHANGE [$$128]  |PARTITIONED|
+                                                                        -- HASH_PARTITION_EXCHANGE [$$135]  |PARTITIONED|
                                                                           -- NESTED_LOOP  |PARTITIONED|
                                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                               -- UNION_ALL  |PARTITIONED|
@@ -127,4 +127,4 @@
                                                                                               -- BTREE_SEARCH  |PARTITIONED|
                                                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                                   -- ASSIGN  |PARTITIONED|
-                                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
+                                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced.plan
index a4370db..59805cc 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced.plan
@@ -1,17 +1,17 @@
 -- NOTIFY_BROKERS  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-    -- PRE_CLUSTERED_GROUP_BY[$$134, $$channelExecutionTime]  |PARTITIONED|
+    -- PRE_CLUSTERED_GROUP_BY[$$141, $$channelExecutionTime]  |PARTITIONED|
             {
               -- AGGREGATE  |LOCAL|
                 -- NESTED_TUPLE_SOURCE  |LOCAL|
             }
       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-        -- STABLE_SORT [$$134(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
-          -- HASH_PARTITION_EXCHANGE [$$134, $$channelExecutionTime]  |PARTITIONED|
+        -- STABLE_SORT [$$141(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
+          -- HASH_PARTITION_EXCHANGE [$$141, $$channelExecutionTime]  |PARTITIONED|
             -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- STABLE_SORT [$$109(ASC)]  |PARTITIONED|
-                  -- HASH_PARTITION_EXCHANGE [$$109]  |PARTITIONED|
+                -- STABLE_SORT [$$116(ASC)]  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$116]  |PARTITIONED|
                     -- STREAM_PROJECT  |PARTITIONED|
                       -- COMMIT  |PARTITIONED|
                         -- STREAM_PROJECT  |PARTITIONED|
@@ -21,7 +21,7 @@
                                 -- STREAM_PROJECT  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                     -- INSERT_DELETE  |PARTITIONED|
-                                      -- HASH_PARTITION_EXCHANGE [$$104]  |PARTITIONED|
+                                      -- HASH_PARTITION_EXCHANGE [$$111]  |PARTITIONED|
                                         -- ASSIGN  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ASSIGN  |PARTITIONED|
@@ -33,12 +33,12 @@
                                                         -- STREAM_SELECT  |PARTITIONED|
                                                           -- STREAM_PROJECT  |PARTITIONED|
                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- HYBRID_HASH_JOIN [$$119][$$118]  |PARTITIONED|
-                                                                -- HASH_PARTITION_EXCHANGE [$$119]  |PARTITIONED|
+                                                              -- HYBRID_HASH_JOIN [$$126][$$125]  |PARTITIONED|
+                                                                -- HASH_PARTITION_EXCHANGE [$$126]  |PARTITIONED|
                                                                   -- STREAM_PROJECT  |PARTITIONED|
                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- HYBRID_HASH_JOIN [$$117, $$115][$$110, $$111]  |PARTITIONED|
-                                                                        -- HASH_PARTITION_EXCHANGE [$$117, $$115]  |PARTITIONED|
+                                                                      -- HYBRID_HASH_JOIN [$$124, $$122][$$117, $$118]  |PARTITIONED|
+                                                                        -- HASH_PARTITION_EXCHANGE [$$124, $$122]  |PARTITIONED|
                                                                           -- STREAM_PROJECT  |PARTITIONED|
                                                                             -- ASSIGN  |PARTITIONED|
                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -46,24 +46,24 @@
                                                                                   -- BROADCAST_EXCHANGE  |PARTITIONED|
                                                                                     -- ASSIGN  |UNPARTITIONED|
                                                                                       -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                                                        -- HASH_PARTITION_EXCHANGE [$$110, $$111]  |PARTITIONED|
+                                                                        -- HASH_PARTITION_EXCHANGE [$$117, $$118]  |PARTITIONED|
                                                                           -- STREAM_PROJECT  |PARTITIONED|
                                                                             -- ASSIGN  |PARTITIONED|
                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                 -- DATASOURCE_SCAN  |PARTITIONED|
                                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                     -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                -- HASH_PARTITION_EXCHANGE [$$118]  |PARTITIONED|
+                                                                -- HASH_PARTITION_EXCHANGE [$$125]  |PARTITIONED|
                                                                   -- STREAM_PROJECT  |PARTITIONED|
                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- HYBRID_HASH_JOIN [$$112][$$128]  |PARTITIONED|
+                                                                      -- HYBRID_HASH_JOIN [$$119][$$135]  |PARTITIONED|
                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                           -- STREAM_SELECT  |PARTITIONED|
                                                                             -- ASSIGN  |PARTITIONED|
                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                 -- BTREE_SEARCH  |PARTITIONED|
                                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                    -- STABLE_SORT [$$138(ASC)]  |PARTITIONED|
+                                                                                    -- STABLE_SORT [$$145(ASC)]  |PARTITIONED|
                                                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                         -- STREAM_PROJECT  |PARTITIONED|
                                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -71,7 +71,7 @@
                                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                                 -- ASSIGN  |PARTITIONED|
                                                                                                   -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                        -- HASH_PARTITION_EXCHANGE [$$128]  |PARTITIONED|
+                                                                        -- HASH_PARTITION_EXCHANGE [$$135]  |PARTITIONED|
                                                                           -- NESTED_LOOP  |PARTITIONED|
                                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                               -- STREAM_PROJECT  |PARTITIONED|
@@ -79,7 +79,7 @@
                                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                     -- BTREE_SEARCH  |PARTITIONED|
                                                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                        -- STABLE_SORT [$$142(ASC)]  |PARTITIONED|
+                                                                                        -- STABLE_SORT [$$149(ASC)]  |PARTITIONED|
                                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                             -- STREAM_PROJECT  |PARTITIONED|
                                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -94,11 +94,11 @@
                                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                       -- BTREE_SEARCH  |PARTITIONED|
                                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                          -- STABLE_SORT [$$145(ASC)]  |PARTITIONED|
+                                                                                          -- STABLE_SORT [$$152(ASC)]  |PARTITIONED|
                                                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                               -- STREAM_PROJECT  |PARTITIONED|
                                                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                                   -- BTREE_SEARCH  |PARTITIONED|
                                                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                                       -- ASSIGN  |PARTITIONED|
-                                                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
+                                                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
index 3e201a7..a080c79 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
@@ -1,17 +1,17 @@
 -- NOTIFY_BROKERS  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-    -- PRE_CLUSTERED_GROUP_BY[$$87, $$channelExecutionTime]  |PARTITIONED|
+    -- PRE_CLUSTERED_GROUP_BY[$$94, $$channelExecutionTime]  |PARTITIONED|
             {
               -- AGGREGATE  |LOCAL|
                 -- NESTED_TUPLE_SOURCE  |LOCAL|
             }
       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-        -- STABLE_SORT [$$87(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
-          -- HASH_PARTITION_EXCHANGE [$$87, $$channelExecutionTime]  |PARTITIONED|
+        -- STABLE_SORT [$$94(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
+          -- HASH_PARTITION_EXCHANGE [$$94, $$channelExecutionTime]  |PARTITIONED|
             -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- STABLE_SORT [$$72(ASC)]  |PARTITIONED|
-                  -- HASH_PARTITION_EXCHANGE [$$72]  |PARTITIONED|
+                -- STABLE_SORT [$$79(ASC)]  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$79]  |PARTITIONED|
                     -- STREAM_PROJECT  |PARTITIONED|
                       -- COMMIT  |PARTITIONED|
                         -- STREAM_PROJECT  |PARTITIONED|
@@ -21,7 +21,7 @@
                                 -- STREAM_PROJECT  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                     -- INSERT_DELETE  |PARTITIONED|
-                                      -- HASH_PARTITION_EXCHANGE [$$66]  |PARTITIONED|
+                                      -- HASH_PARTITION_EXCHANGE [$$73]  |PARTITIONED|
                                         -- ASSIGN  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ASSIGN  |PARTITIONED|
@@ -35,8 +35,8 @@
                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                               -- STREAM_PROJECT  |PARTITIONED|
                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- HYBRID_HASH_JOIN [$$79, $$77][$$73, $$74]  |PARTITIONED|
-                                                                    -- HASH_PARTITION_EXCHANGE [$$79, $$77]  |PARTITIONED|
+                                                                  -- HYBRID_HASH_JOIN [$$86, $$84][$$80, $$81]  |PARTITIONED|
+                                                                    -- HASH_PARTITION_EXCHANGE [$$86, $$84]  |PARTITIONED|
                                                                       -- STREAM_PROJECT  |PARTITIONED|
                                                                         -- ASSIGN  |PARTITIONED|
                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -44,7 +44,7 @@
                                                                               -- BROADCAST_EXCHANGE  |PARTITIONED|
                                                                                 -- ASSIGN  |UNPARTITIONED|
                                                                                   -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                                                    -- HASH_PARTITION_EXCHANGE [$$73, $$74]  |PARTITIONED|
+                                                                    -- HASH_PARTITION_EXCHANGE [$$80, $$81]  |PARTITIONED|
                                                                       -- STREAM_PROJECT  |PARTITIONED|
                                                                         -- ASSIGN  |PARTITIONED|
                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -58,4 +58,4 @@
                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                       -- DATASOURCE_SCAN  |PARTITIONED|
                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
+                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
index 653e42c..19dac15 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
@@ -1,17 +1,17 @@
 -- NOTIFY_BROKERS  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-    -- PRE_CLUSTERED_GROUP_BY[$$87, $$channelExecutionTime]  |PARTITIONED|
+    -- PRE_CLUSTERED_GROUP_BY[$$94, $$channelExecutionTime]  |PARTITIONED|
             {
               -- AGGREGATE  |LOCAL|
                 -- NESTED_TUPLE_SOURCE  |LOCAL|
             }
       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-        -- STABLE_SORT [$$87(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
-          -- HASH_PARTITION_EXCHANGE [$$87, $$channelExecutionTime]  |PARTITIONED|
+        -- STABLE_SORT [$$94(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
+          -- HASH_PARTITION_EXCHANGE [$$94, $$channelExecutionTime]  |PARTITIONED|
             -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- STABLE_SORT [$$72(ASC)]  |PARTITIONED|
-                  -- HASH_PARTITION_EXCHANGE [$$72]  |PARTITIONED|
+                -- STABLE_SORT [$$79(ASC)]  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$79]  |PARTITIONED|
                     -- STREAM_PROJECT  |PARTITIONED|
                       -- COMMIT  |PARTITIONED|
                         -- STREAM_PROJECT  |PARTITIONED|
@@ -21,7 +21,7 @@
                                 -- STREAM_PROJECT  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                     -- INSERT_DELETE  |PARTITIONED|
-                                      -- HASH_PARTITION_EXCHANGE [$$66]  |PARTITIONED|
+                                      -- HASH_PARTITION_EXCHANGE [$$73]  |PARTITIONED|
                                         -- ASSIGN  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ASSIGN  |PARTITIONED|
@@ -35,8 +35,8 @@
                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                               -- STREAM_PROJECT  |PARTITIONED|
                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- HYBRID_HASH_JOIN [$$79, $$77][$$73, $$74]  |PARTITIONED|
-                                                                    -- HASH_PARTITION_EXCHANGE [$$79, $$77]  |PARTITIONED|
+                                                                  -- HYBRID_HASH_JOIN [$$86, $$84][$$80, $$81]  |PARTITIONED|
+                                                                    -- HASH_PARTITION_EXCHANGE [$$86, $$84]  |PARTITIONED|
                                                                       -- STREAM_PROJECT  |PARTITIONED|
                                                                         -- ASSIGN  |PARTITIONED|
                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -44,7 +44,7 @@
                                                                               -- BROADCAST_EXCHANGE  |PARTITIONED|
                                                                                 -- ASSIGN  |UNPARTITIONED|
                                                                                   -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                                                    -- HASH_PARTITION_EXCHANGE [$$73, $$74]  |PARTITIONED|
+                                                                    -- HASH_PARTITION_EXCHANGE [$$80, $$81]  |PARTITIONED|
                                                                       -- STREAM_PROJECT  |PARTITIONED|
                                                                         -- ASSIGN  |PARTITIONED|
                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -65,11 +65,11 @@
       -- STREAM_PROJECT  |PARTITIONED|
         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
           -- INSERT_DELETE  |PARTITIONED|
-            -- HASH_PARTITION_EXCHANGE [$$4]  |PARTITIONED|
+            -- HASH_PARTITION_EXCHANGE [$$12]  |PARTITIONED|
               -- ASSIGN  |UNPARTITIONED|
                 -- STREAM_PROJECT  |UNPARTITIONED|
                   -- ASSIGN  |UNPARTITIONED|
                     -- STREAM_PROJECT  |UNPARTITIONED|
                       -- ASSIGN  |UNPARTITIONED|
                         -- ASSIGN  |UNPARTITIONED|
-                          -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
\ No newline at end of file
+                          -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
index b15c0d6..a847ef0 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
@@ -1,17 +1,17 @@
 -- NOTIFY_BROKERS  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-    -- PRE_CLUSTERED_GROUP_BY[$$87, $$channelExecutionTime]  |PARTITIONED|
+    -- PRE_CLUSTERED_GROUP_BY[$$94, $$channelExecutionTime]  |PARTITIONED|
             {
               -- AGGREGATE  |LOCAL|
                 -- NESTED_TUPLE_SOURCE  |LOCAL|
             }
       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-        -- STABLE_SORT [$$87(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
-          -- HASH_PARTITION_EXCHANGE [$$87, $$channelExecutionTime]  |PARTITIONED|
+        -- STABLE_SORT [$$94(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
+          -- HASH_PARTITION_EXCHANGE [$$94, $$channelExecutionTime]  |PARTITIONED|
             -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- STABLE_SORT [$$72(ASC)]  |PARTITIONED|
-                  -- HASH_PARTITION_EXCHANGE [$$72]  |PARTITIONED|
+                -- STABLE_SORT [$$79(ASC)]  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$79]  |PARTITIONED|
                     -- STREAM_PROJECT  |PARTITIONED|
                       -- COMMIT  |PARTITIONED|
                         -- STREAM_PROJECT  |PARTITIONED|
@@ -21,7 +21,7 @@
                                 -- STREAM_PROJECT  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                     -- INSERT_DELETE  |PARTITIONED|
-                                      -- HASH_PARTITION_EXCHANGE [$$66]  |PARTITIONED|
+                                      -- HASH_PARTITION_EXCHANGE [$$73]  |PARTITIONED|
                                         -- ASSIGN  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ASSIGN  |PARTITIONED|
@@ -35,8 +35,8 @@
                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                               -- STREAM_PROJECT  |PARTITIONED|
                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- HYBRID_HASH_JOIN [$$79, $$77][$$73, $$74]  |PARTITIONED|
-                                                                    -- HASH_PARTITION_EXCHANGE [$$79, $$77]  |PARTITIONED|
+                                                                  -- HYBRID_HASH_JOIN [$$86, $$84][$$80, $$81]  |PARTITIONED|
+                                                                    -- HASH_PARTITION_EXCHANGE [$$86, $$84]  |PARTITIONED|
                                                                       -- STREAM_PROJECT  |PARTITIONED|
                                                                         -- ASSIGN  |PARTITIONED|
                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -44,7 +44,7 @@
                                                                               -- BROADCAST_EXCHANGE  |PARTITIONED|
                                                                                 -- ASSIGN  |UNPARTITIONED|
                                                                                   -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                                                    -- HASH_PARTITION_EXCHANGE [$$73, $$74]  |PARTITIONED|
+                                                                    -- HASH_PARTITION_EXCHANGE [$$80, $$81]  |PARTITIONED|
                                                                       -- STREAM_PROJECT  |PARTITIONED|
                                                                         -- ASSIGN  |PARTITIONED|
                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -72,4 +72,4 @@
                     -- BTREE_SEARCH  |PARTITIONED|
                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                         -- ASSIGN  |PARTITIONED|
-                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
+                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|