Better push version
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 76e3e82..9a2c6b1 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -257,9 +257,9 @@
      *
      * push version:
      * SET inline_with "false";
-     * select value {"payload": {"result":r, "subscriptionId": brokerSubId}} from (
+     * select value {"payload": {"result":result, "subscriptionId": brokerSubId}} from (
      * with channelExecutionTime as current_datetime()
-     * select b.BrokerEndPoint, result as r, bs.brokerSubId as brokerSubId, channelExecutionTime
+     * select b.BrokerEndPoint, result, bs.brokerSubId as brokerSubId, channelExecutionTime
      * from steven.EmergencyChannelChannelSubscriptions sub,
      * steven.RecentEmergenciesNearUser(sub.param0) result,
      * steven.EmergencyChannelBrokerSubscriptions bs,
@@ -291,7 +291,7 @@
         } else {
             builder.append(
                     "select value {\"payload\": {\"" + FUNCTION_RESULT_VAR + "\":" + FUNCTION_RESULT_VAR
-                            + ", \"subscriptionIds\": brokerSubIds}} from (\n");
+                            + ", \"subscriptionId\": brokerSubId}} from (\n");
             builder.append("with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n");
             builder.append(
                     "select " + BROKER_RECORD_VAR + "." + BADConstants.BrokerEndPoint + ", " + FUNCTION_RESULT_VAR
@@ -334,10 +334,7 @@
             builder.append(" returning " + INSERTED_RECORD_VAR);
 
         } else {
-            builder.append(") results\n");
-            builder.append("group by " + BADConstants.BrokerEndPoint + "," + FUNCTION_RESULT_VAR + ", "
-                    + BADConstants.ChannelExecutionTime);
-            builder.append(" group as brokerSubIds (" + BADConstants.BrokerSubscriptionId + " as subscriptionId)");
+            builder.append(") results");
         }
 
         builder.append(";");
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 18cd41a..2596421 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -92,9 +92,9 @@
         String channelName;
         AssignOperator pushAssign = null;
         AssignOperator newAssign = null;
-        GroupByOperator pushGroupBy = null;
+        UnnestOperator pushUnnest = null;
         LogicalVariable brokerSubsVar = null;
-        LogicalVariable brokerEndpoint = null;
+        LogicalVariable brokerEndpoint = context.newVar();
         LogicalVariable brokerSubId = null;
 
         if (!push) {
@@ -138,12 +138,6 @@
             }
             pushAssign = (AssignOperator) op2;
 
-            AbstractLogicalOperator op3 = (AbstractLogicalOperator) pushAssign.getInputs().get(0).getValue();
-            if (op3.getOperatorTag() != LogicalOperatorTag.GROUP) {
-                return false;
-            }
-            pushGroupBy = (GroupByOperator) op3;
-
             //if push, get the channel name here instead
             subscriptionsScan = (DataSourceScanOperator) findOp(op, LogicalOperatorTag.DATASOURCESCAN, "",
                     BADConstants.ChannelSubscriptionsType);
@@ -154,7 +148,13 @@
             String datasetName = dds.getDataset().getDatasetName();
             channelDataverse = dds.getDataset().getDataverseName();
             channelName = datasetName.substring(0, datasetName.length() - 13);
-            brokerEndpoint = pushGroupBy.getGroupByList().get(0).first;
+
+            AbstractLogicalOperator unnest = (AbstractLogicalOperator) pushAssign.getInputs().get(0).getValue();
+            if (unnest.getOperatorTag() != LogicalOperatorTag.UNNEST) {
+                return false;
+            }
+
+            pushUnnest = (UnnestOperator) unnest;
         }
 
         //The channelExecutionTime is created just before the scan
@@ -173,7 +173,6 @@
             if (subplanOperator == null) {
                 return false;
             }
-            brokerEndpoint = context.newVar();
             brokerSubId = context.newVar();
             brokerSubsVar = ((AggregateOperator) subplanOperator.getNestedPlans().get(0).getRoots().get(0).getValue())
                     .getVariables().get(0);
@@ -181,19 +180,18 @@
             newAssign = createAssignsAndUnnest(brokerSubsVar, brokerEndpoint, brokerSubId, op, context);
 
             context.computeAndSetTypeEnvironmentForOperator(op1);
-        } else {
-            channelExecutionVar = pushGroupBy.getGroupByList().get(2).first;
+
+            //Maintain the variables through the existing project
+            ProjectOperator badProject = (ProjectOperator) findOp(op1, LogicalOperatorTag.PROJECT, "", "");
+            badProject.getVariables().add(channelExecutionVar);
+            badProject.getVariables().add(brokerSubsVar);
+            context.computeAndSetTypeEnvironmentForOperator(badProject);
         }
-        //Maintain the variables through the existing project
-        ProjectOperator badProject = (ProjectOperator) findOp(op1, LogicalOperatorTag.PROJECT, "", "");
-        badProject.getVariables().add(channelExecutionVar);
-        badProject.getVariables().add(push ? brokerEndpoint : brokerSubsVar);
-        context.computeAndSetTypeEnvironmentForOperator(badProject);
 
         //Create my brokerNotify plan above the extension Operator
         DelegateOperator dOp = push
                 ? createNotifyBrokerPushPlan(brokerEndpoint, channelExecutionVar, context, pushAssign, channelDataverse,
-                        channelName)
+                        channelName, pushUnnest)
                 : createNotifyBrokerPullPlan(brokerEndpoint, brokerSubId, channelExecutionVar, context, newAssign,
                         channelDataverse, channelName);
 
@@ -262,7 +260,7 @@
 
     private DelegateOperator createNotifyBrokerPushPlan(LogicalVariable brokerEndpointVar,
             LogicalVariable channelExecutionVar, IOptimizationContext context, AssignOperator payLoadAssign,
-            String channelDataverse, String channelName)
+            String channelDataverse, String channelName, UnnestOperator pushUnnest)
             throws AlgebricksException {
         IVariableTypeEnvironment env = payLoadAssign.computeOutputTypeEnvironment(context);
         IAType resultType = (IAType) env.getVarType(payLoadAssign.getVariables().get(0));
@@ -273,6 +271,21 @@
                 channelName, true, resultType);
 
         extensionOp.getInputs().add(new MutableObject<>(payLoadAssign));
+
+        FunctionInfo finfoGetField = (FunctionInfo) FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME);
+        ScalarFunctionCallExpression getBrokerEndPoint = new ScalarFunctionCallExpression(finfoGetField,
+                new MutableObject<>(new VariableReferenceExpression(pushUnnest.getVariable())), new MutableObject<>(
+                        new ConstantExpression(new AsterixConstantValue(new AString(BADConstants.BrokerEndPoint)))));
+
+        AssignOperator assignBrokerEndPoint =
+                new AssignOperator(brokerEndpointVar, new MutableObject<>(getBrokerEndPoint));
+
+        assignBrokerEndPoint.getInputs().addAll(payLoadAssign.getInputs());
+        payLoadAssign.getInputs().clear();
+        payLoadAssign.getInputs().add(new MutableObject<>(assignBrokerEndPoint));
+
+        context.computeAndSetTypeEnvironmentForOperator(assignBrokerEndPoint);
+        context.computeAndSetTypeEnvironmentForOperator(payLoadAssign);
         context.computeAndSetTypeEnvironmentForOperator(extensionOp);
 
         return extensionOp;
@@ -376,9 +389,17 @@
                 } else if (searchTag == LogicalOperatorTag.PROJECT) {
                     return (AbstractLogicalOperator) subOp.getValue();
 
-                } else if (searchTag == LogicalOperatorTag.DATASOURCESCAN && isSubscriptionsScan(
+                } else if (searchTag == LogicalOperatorTag.DATASOURCESCAN) {
+                    if (isSubscriptionsScan(
                         (AbstractLogicalOperator) subOp.getValue(), subscriptionsName, subscriptionType)) {
-                    return (AbstractLogicalOperator) subOp.getValue();
+                        return (AbstractLogicalOperator) subOp.getValue();
+                    } else {
+                        AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(), searchTag,
+                                subscriptionsName, subscriptionType);
+                        if (nestedOp != null) {
+                            return nestedOp;
+                        }
+                    }
 
                 } else if (searchTag == LogicalOperatorTag.DELEGATE_OPERATOR) {
                     DelegateOperator dOp = (DelegateOperator) subOp.getValue();
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
index 8e07af2..b604f75 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -190,8 +190,6 @@
             }
 
             if (push) {
-                int pushOffset = inputArg1.getStartOffset();
-                bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), pushOffset + 1);
                 if (!firstResult) {
                     sendStreams.get(endpoint).append(',');
                 }