Add push-based channels and improve broker notifications

Change-Id: Ie3c7cae0f015d6bc01dd912499565bb12c15abc3
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
index 41853b9..3df9a76 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
@@ -18,51 +18,16 @@
  */
 package org.apache.asterix.bad;
 
-import java.io.BufferedReader;
-import java.io.DataOutputStream;
-import java.io.InputStreamReader;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.om.base.AOrderedList;
-import org.apache.asterix.om.base.AUUID;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
- * Provides functionality for channel jobs and communicating with Brokers
+ * Provides functionality for channel jobs
  */
 public class ChannelJobService {
 
     private static final Logger LOGGER = Logger.getLogger(ChannelJobService.class.getName());
 
-    public static void sendBrokerNotificationsForChannel(EntityId activeJobId, String brokerEndpoint,
-            AOrderedList subscriptionIds, String channelExecutionTime) throws HyracksDataException {
-        String formattedString;
-        formattedString = formatJSON(activeJobId, subscriptionIds, channelExecutionTime);
-        sendMessage(brokerEndpoint, formattedString);
-    }
-
-    public static String formatJSON(EntityId activeJobId, AOrderedList subscriptionIds, String channelExecutionTime) {
-        String JSON = "{ \"dataverseName\":\"" + activeJobId.getDataverse() + "\", \"channelName\":\""
-                + activeJobId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\""
-                + channelExecutionTime + "\", \"subscriptionIds\":[";
-        for (int i = 0; i < subscriptionIds.size(); i++) {
-            AUUID subId = (AUUID) subscriptionIds.getItem(i);
-            String subscriptionString = subId.toString();
-            //Broker code currently cannot handle the "uuid {}" part of the string, so we parse just the value
-            subscriptionString = subscriptionString.substring(8, subscriptionString.length() - 2);
-            JSON += "\"" + subscriptionString + "\"";
-            if (i < subscriptionIds.size() - 1) {
-                JSON += ",";
-            }
-        }
-        JSON += "]}";
-        return JSON;
-
-    }
 
     public static long findPeriod(String duration) {
         //TODO: Allow Repetitive Channels to use YMD durations
@@ -92,61 +57,6 @@
         return (long) (seconds * 1000);
     }
 
-    public static void sendMessage(String targetURL, String urlParameters) {
-        HttpURLConnection connection = null;
-        try {
-            //Create connection
-            URL url = new URL(targetURL);
-            connection = (HttpURLConnection) url.openConnection();
-            connection.setRequestMethod("POST");
-            connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
-
-            connection.setRequestProperty("Content-Length", Integer.toString(urlParameters.getBytes().length));
-            connection.setRequestProperty("Content-Language", "en-US");
-
-            connection.setUseCaches(false);
-            connection.setDoOutput(true);
-            connection.setConnectTimeout(500);
-
-            if (connection.getOutputStream() != null) {
-                //Send message
-                DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
-                wr.writeBytes(urlParameters);
-                wr.close();
-            } else {
-                throw new Exception();
-            }
-
-            if (LOGGER.isLoggable(Level.INFO)) {
-                int responseCode = connection.getResponseCode();
-                LOGGER.info("\nSending 'POST' request to URL : " + url);
-                LOGGER.info("Post parameters : " + urlParameters);
-                LOGGER.info("Response Code : " + responseCode);
-            }
-
-            if (connection.getInputStream() != null) {
-                BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
-                String inputLine;
-                StringBuffer response = new StringBuffer();
-                while ((inputLine = in.readLine()) != null) {
-                    response.append(inputLine);
-                }
-                in.close();
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.log(Level.INFO, response.toString());
-                }
-            } else {
-                LOGGER.log(Level.WARNING, "Channel Failed to get response from Broker.");
-            }
-
-        } catch (Exception e) {
-            LOGGER.log(Level.WARNING, "Channel Failed to connect to Broker.");
-        } finally {
-            if (connection != null) {
-                connection.disconnect();
-            }
-        }
-    }
 
     @Override
     public String toString() {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 161f093..87ac320 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -40,7 +40,6 @@
 import org.apache.asterix.bad.metadata.Channel;
 import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
 import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType;
-import org.apache.asterix.common.transactions.ITxnIdFactory;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -48,6 +47,7 @@
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.expression.CallExpr;
@@ -59,6 +59,7 @@
 import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
 import org.apache.asterix.lang.common.statement.InsertStatement;
 import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
+import org.apache.asterix.lang.common.statement.Query;
 import org.apache.asterix.lang.common.statement.SetStatement;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
@@ -92,14 +93,16 @@
     private String subscriptionsTableName;
     private String resultsTableName;
     private String dataverse;
+    private final boolean push;
 
     public CreateChannelStatement(Identifier dataverseName, Identifier channelName, FunctionSignature function,
-            Expression period) {
+            Expression period, boolean push) {
         this.channelName = channelName;
         this.dataverseName = dataverseName;
         this.function = function;
         this.period = (CallExpr) period;
         this.duration = "";
+        this.push = push;
     }
 
     public Identifier getDataverseName() {
@@ -218,12 +221,37 @@
 
     }
 
+    private JobSpecification compilePushChannel(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+            IHyracksClientConnection hcc, Query q) throws Exception {
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        JobSpecification jobSpec = null;
+        try {
+            jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+        } catch (Exception e) {
+            LOGGER.log(Level.INFO, e.getMessage(), e);
+            if (bActiveTxn) {
+                ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
+            }
+            throw e;
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+        return jobSpec;
+    }
+
     private JobSpecification createChannelJob(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats) throws Exception {
         StringBuilder builder = new StringBuilder();
         builder.append("SET inline_with \"false\";\n");
-        builder.append("insert into " + dataverse + "." + resultsTableName);
-        builder.append(" as a (\n" + "with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n");
+        if (!push) {
+            builder.append("insert into " + dataverse + "." + resultsTableName);
+            builder.append(" as a (\n");
+        }
+        builder.append("with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n");
         builder.append("select result, ");
         builder.append(BADConstants.ChannelExecutionTime + ", ");
         builder.append("sub." + BADConstants.SubscriptionId + " as " + BADConstants.SubscriptionId + ",");
@@ -238,15 +266,19 @@
         builder.append("sub.param" + i + ") result \n");
         builder.append("where b." + BADConstants.BrokerName + " = sub." + BADConstants.BrokerName + "\n");
         builder.append("and b." + BADConstants.DataverseName + " = sub." + BADConstants.DataverseName + "\n");
-        builder.append(")");
-        builder.append(" returning a");
+        if (!push) {
+            builder.append(")");
+            builder.append(" returning a");
+        }
         builder.append(";");
         BADParserFactory factory = new BADParserFactory();
         List<Statement> fStatements = factory.createParser(new StringReader(builder.toString())).parse();
 
         SetStatement ss = (SetStatement) fStatements.get(0);
         metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());
-
+        if (push) {
+            return compilePushChannel(statementExecutor, metadataProvider, hcc, (Query) fStatements.get(1));
+        }
         return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1),
                 hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null);
     }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index d83b606..9ead7f0 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -31,6 +31,7 @@
 import org.apache.asterix.om.base.AString;
 import org.apache.asterix.om.constants.AsterixConstantValue;
 import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.types.IAType;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -43,6 +44,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
@@ -74,133 +76,197 @@
         if (op1.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
             return false;
         }
+        boolean push = false;
+
         AbstractLogicalOperator op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
         if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
-            return false;
+            if (op.getOperatorTag() != LogicalOperatorTag.PROJECT) {
+                return false;
+            }
+            push = true;
         }
-        DelegateOperator eOp = (DelegateOperator) op;
-        if (!(eOp.getDelegate() instanceof CommitOperator)) {
-            return false;
-        }
-        AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
-        if (descendantOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
-            return false;
-        }
-        InsertDeleteUpsertOperator insertOp = (InsertDeleteUpsertOperator) descendantOp;
-        if (insertOp.getOperation() != InsertDeleteUpsertOperator.Kind.INSERT) {
-            return false;
-        }
-        DatasetDataSource dds = (DatasetDataSource) insertOp.getDataSource();
-        String datasetName = dds.getDataset().getDatasetName();
-        if (!dds.getDataset().getItemTypeDataverseName().equals("Metadata")
-                || !dds.getDataset().getItemTypeName().equals("ChannelResultsType")
-                || !datasetName.endsWith("Results")) {
-            return false;
-        }
-        String channelDataverse = dds.getDataset().getDataverseName();
-        //Now we know that we are inserting into results
+        DataSourceScanOperator subscriptionsScan;
+        String channelDataverse;
+        String channelName;
 
-        String channelName = datasetName.substring(0, datasetName.length() - 7);
-        String subscriptionsName = channelName + "Subscriptions";
-        //TODO: Can we check here to see if there is a channel with such a name?
+        if (!push) {
+            DelegateOperator eOp = (DelegateOperator) op;
+            if (!(eOp.getDelegate() instanceof CommitOperator)) {
+                return false;
+            }
+            AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
+            if (descendantOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
+                return false;
+            }
+            InsertDeleteUpsertOperator insertOp = (InsertDeleteUpsertOperator) descendantOp;
+            if (insertOp.getOperation() != InsertDeleteUpsertOperator.Kind.INSERT) {
+                return false;
+            }
+            DatasetDataSource dds = (DatasetDataSource) insertOp.getDataSource();
+            String datasetName = dds.getDataset().getDatasetName();
+            if (!dds.getDataset().getItemTypeDataverseName().equals("Metadata")
+                    || !dds.getDataset().getItemTypeName().equals("ChannelResultsType")
+                    || !datasetName.endsWith("Results")) {
+                return false;
+            }
+            channelDataverse = dds.getDataset().getDataverseName();
+            //Now we know that we are inserting into results
 
-        DataSourceScanOperator subscriptionsScan = (DataSourceScanOperator) findOp(op, subscriptionsName);
-        if (subscriptionsScan == null) {
-            return false;
+            channelName = datasetName.substring(0, datasetName.length() - 7);
+            String subscriptionsName = channelName + "Subscriptions";
+            subscriptionsScan = (DataSourceScanOperator) findOp(op, subscriptionsName);
+            if (subscriptionsScan == null) {
+                return false;
+            }
+
+        } else {
+            //if push, get the channel name here instead
+            subscriptionsScan = (DataSourceScanOperator) findOp(op, "");
+            if (subscriptionsScan == null) {
+                return false;
+            }
+            DatasetDataSource dds = (DatasetDataSource) subscriptionsScan.getDataSource();
+            String datasetName = dds.getDataset().getDatasetName();
+            channelDataverse = dds.getDataset().getDataverseName();
+            channelName = datasetName.substring(0, datasetName.length() - 13);
         }
 
-        //Now we want to make sure and set the commit to be a nonsink commit
-        ((CommitOperator) eOp.getDelegate()).setSink(false);
-
-        //Now we need to get the broker EndPoint 
+        //Now we need to get the broker EndPoint
         LogicalVariable brokerEndpointVar = context.newVar();
         AbstractLogicalOperator opAboveBrokersScan = findOp(op, "brokers");
-        AssignOperator assignOp = createbrokerEndPointAssignOperator(brokerEndpointVar, opAboveBrokersScan);
-        //now brokerNameVar holds the brokerName for use farther up in the plan
-
-        context.computeAndSetTypeEnvironmentForOperator(assignOp);
-        context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan);
-        context.computeAndSetTypeEnvironmentForOperator(eOp);
+        if (opAboveBrokersScan == null) {
+            return false;
+        }
 
         //get subscriptionIdVar
         LogicalVariable subscriptionIdVar = subscriptionsScan.getVariables().get(0);
 
         //The channelExecutionTime is created just before the scan
-        LogicalVariable channelExecutionVar = ((AssignOperator) subscriptionsScan.getInputs().get(0).getValue())
-                .getVariables().get(0);
+        ILogicalOperator channelExecutionAssign = subscriptionsScan.getInputs().get(0).getValue();
+        if (channelExecutionAssign.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+            return false;
+        }
+        LogicalVariable channelExecutionVar = ((AssignOperator) channelExecutionAssign).getVariables().get(0);
+        if (!channelExecutionVar.toString().equals("$$" + BADConstants.ChannelExecutionTime)) {
+            return false;
+        }
 
-        ProjectOperator badProject = (ProjectOperator) findOp(op, "project");
+        if (!push) {
+            ((CommitOperator) ((DelegateOperator) op).getDelegate()).setSink(false);
+        }
+
+        AssignOperator assignOp = createbrokerEndPointAssignOperator(brokerEndpointVar, opAboveBrokersScan);
+        //now brokerNameVar holds the brokerName for use farther up in the plan
+
+        context.computeAndSetTypeEnvironmentForOperator(assignOp);
+        context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan);
+        context.computeAndSetTypeEnvironmentForOperator(op);
+
+        ProjectOperator badProject = (ProjectOperator) findOp(op1, "project");
         badProject.getVariables().add(subscriptionIdVar);
         badProject.getVariables().add(brokerEndpointVar);
         badProject.getVariables().add(channelExecutionVar);
         context.computeAndSetTypeEnvironmentForOperator(badProject);
 
+
         //Create my brokerNotify plan above the extension Operator
-        DelegateOperator dOp = createNotifyBrokerPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar,
-                context, eOp, (DistributeResultOperator) op1, channelDataverse, channelName);
+        DelegateOperator dOp = push
+                ? createNotifyBrokerPushPlan(brokerEndpointVar, badProject.getVariables().get(0), channelExecutionVar,
+                        context, op, (DistributeResultOperator) op1, channelDataverse, channelName)
+                : createNotifyBrokerPullPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar, context, op,
+                        (DistributeResultOperator) op1, channelDataverse, channelName);
 
         opRef.setValue(dOp);
 
         return true;
     }
 
-    private DelegateOperator createNotifyBrokerPlan(LogicalVariable brokerEndpointVar,
-            LogicalVariable subscriptionIdVar, LogicalVariable channelExecutionVar, IOptimizationContext context,
-            ILogicalOperator eOp, DistributeResultOperator distributeOp, String channelDataverse, String channelName)
-                    throws AlgebricksException {
-        //create the Distinct Op
-        ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<Mutable<ILogicalExpression>>();
-        VariableReferenceExpression vExpr = new VariableReferenceExpression(subscriptionIdVar);
-        expressions.add(new MutableObject<ILogicalExpression>(vExpr));
-        DistinctOperator distinctOp = new DistinctOperator(expressions);
-
-        //create the GroupBy Op
-        //And set the distinct as input
-        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
-        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByDecorList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
-        List<ILogicalPlan> nestedPlans = new ArrayList<ILogicalPlan>();
-
-        //create group by operator
-        GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, nestedPlans);
-        groupbyOp.addGbyExpression(null, new VariableReferenceExpression(brokerEndpointVar));
-        groupbyOp.addGbyExpression(null, new VariableReferenceExpression(channelExecutionVar));
-        groupbyOp.getInputs().add(new MutableObject<ILogicalOperator>(distinctOp));
-
-        //create nested plan for subscription ids in group by
-        NestedTupleSourceOperator nestedTupleSourceOp = new NestedTupleSourceOperator(
-                new MutableObject<ILogicalOperator>(groupbyOp));
-        //TODO: This is from translationcontext. It might be needed to make the variable exist outside of the subplan
-        //LogicalVariable subscriptionListVar = context.newSubplanOutputVar();
-        LogicalVariable subscriptionListVar = context.newVar();
-        List<LogicalVariable> aggVars = new ArrayList<LogicalVariable>();
-        aggVars.add(subscriptionListVar);
-        AggregateFunctionCallExpression funAgg = BuiltinFunctions.makeAggregateFunctionExpression(
-                BuiltinFunctions.LISTIFY, new ArrayList<Mutable<ILogicalExpression>>());
-        funAgg.getArguments()
-                .add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(subscriptionIdVar)));
-        List<Mutable<ILogicalExpression>> aggExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-        aggExpressions.add(new MutableObject<ILogicalExpression>(funAgg));
-        AggregateOperator listifyOp = new AggregateOperator(aggVars, aggExpressions);
-        listifyOp.getInputs().add(new MutableObject<ILogicalOperator>(nestedTupleSourceOp));
-
-        //add nested plans
-        nestedPlans.add(new ALogicalPlanImpl(new MutableObject<ILogicalOperator>(listifyOp)));
-
-        //Create the NotifyBrokerOperator
-        NotifyBrokerOperator notifyBrokerOp = new NotifyBrokerOperator(brokerEndpointVar, subscriptionListVar,
-                channelExecutionVar);
+    private DelegateOperator createBrokerOp(LogicalVariable brokerEndpointVar, LogicalVariable sendVar,
+            LogicalVariable channelExecutionVar, String channelDataverse, String channelName, boolean push,
+            IAType resultType) {
+        NotifyBrokerOperator notifyBrokerOp =
+                new NotifyBrokerOperator(brokerEndpointVar, sendVar, channelExecutionVar, push, resultType);
         EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, channelDataverse, channelName);
         NotifyBrokerPOperator notifyBrokerPOp = new NotifyBrokerPOperator(activeId);
         notifyBrokerOp.setPhysicalOperator(notifyBrokerPOp);
         DelegateOperator extensionOp = new DelegateOperator(notifyBrokerOp);
         extensionOp.setPhysicalOperator(notifyBrokerPOp);
-        extensionOp.getInputs().add(new MutableObject<ILogicalOperator>(groupbyOp));
+        return extensionOp;
+    }
 
-        //Set the input for the brokerNotify as the replicate operator
-        distinctOp.getInputs().add(new MutableObject<ILogicalOperator>(eOp));
+    private DelegateOperator createNotifyBrokerPushPlan(LogicalVariable brokerEndpointVar, LogicalVariable sendVar,
+            LogicalVariable channelExecutionVar, IOptimizationContext context, ILogicalOperator eOp,
+            DistributeResultOperator distributeOp, String channelDataverse, String channelName)
+            throws AlgebricksException {
+        //Find the assign operator to get the result type that we need
+        AbstractLogicalOperator assign = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
+        while (assign.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+            assign = (AbstractLogicalOperator) assign.getInputs().get(0).getValue();
+        }
+        IVariableTypeEnvironment env = assign.computeOutputTypeEnvironment(context);
+        IAType resultType = (IAType) env.getVarType(sendVar);
+
+        //Create the NotifyBrokerOperator
+        DelegateOperator extensionOp = createBrokerOp(brokerEndpointVar, sendVar, channelExecutionVar, channelDataverse,
+                channelName, true, resultType);
+
+        extensionOp.getInputs().add(new MutableObject<>(eOp));
+        context.computeAndSetTypeEnvironmentForOperator(extensionOp);
+
+        return extensionOp;
+
+    }
+
+    private DelegateOperator createNotifyBrokerPullPlan(LogicalVariable brokerEndpointVar,
+            LogicalVariable sendVar, LogicalVariable channelExecutionVar, IOptimizationContext context,
+            ILogicalOperator eOp, DistributeResultOperator distributeOp, String channelDataverse, String channelName)
+                    throws AlgebricksException {
+
+        //Create the Distinct Op
+        ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<>();
+        VariableReferenceExpression vExpr = new VariableReferenceExpression(sendVar);
+        expressions.add(new MutableObject<>(vExpr));
+        DistinctOperator distinctOp = new DistinctOperator(expressions);
+
+
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<>();
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByDecorList = new ArrayList<>();
+        List<ILogicalPlan> nestedPlans = new ArrayList<>();
+
+        //Create GroupBy operator
+        GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, nestedPlans);
+        groupbyOp.addGbyExpression(null, new VariableReferenceExpression(brokerEndpointVar));
+        groupbyOp.addGbyExpression(null, new VariableReferenceExpression(channelExecutionVar));
+
+        //Set the distinct as input
+        groupbyOp.getInputs().add(new MutableObject<>(distinctOp));
+
+        //create nested plan for subscription ids in group by
+        NestedTupleSourceOperator nestedTupleSourceOp = new NestedTupleSourceOperator(new MutableObject<>(groupbyOp));
+        LogicalVariable sendListVar = context.newVar();
+        List<LogicalVariable> aggVars = new ArrayList<>();
+        aggVars.add(sendListVar);
+        AggregateFunctionCallExpression funAgg =
+                BuiltinFunctions.makeAggregateFunctionExpression(BuiltinFunctions.LISTIFY, new ArrayList<>());
+        funAgg.getArguments().add(new MutableObject<>(new VariableReferenceExpression(sendVar)));
+        List<Mutable<ILogicalExpression>> aggExpressions = new ArrayList<>();
+        aggExpressions.add(new MutableObject<>(funAgg));
+        AggregateOperator listifyOp = new AggregateOperator(aggVars, aggExpressions);
+        listifyOp.getInputs().add(new MutableObject<>(nestedTupleSourceOp));
+
+        //add nested plans
+        nestedPlans.add(new ALogicalPlanImpl(new MutableObject<>(listifyOp)));
+
+
+        //Create the NotifyBrokerOperator
+        DelegateOperator extensionOp = createBrokerOp(brokerEndpointVar, sendListVar, channelExecutionVar,
+                channelDataverse, channelName, false, null);
+
+        //Set the input for the distinct as the old top
+        extensionOp.getInputs().add(new MutableObject<>(groupbyOp));
+        distinctOp.getInputs().add(new MutableObject<>(eOp));
 
         //compute environment bottom up
-
         context.computeAndSetTypeEnvironmentForOperator(distinctOp);
         context.computeAndSetTypeEnvironmentForOperator(groupbyOp);
         context.computeAndSetTypeEnvironmentForOperator(nestedTupleSourceOp);
@@ -211,7 +277,6 @@
 
     }
 
-    @SuppressWarnings("unchecked")
     private AssignOperator createbrokerEndPointAssignOperator(LogicalVariable brokerEndpointVar,
             AbstractLogicalOperator opAboveBrokersScan) {
         Mutable<ILogicalExpression> fieldRef = new MutableObject<ILogicalExpression>(
@@ -244,9 +309,10 @@
         return assignOp;
     }
 
-    /*This function searches for the needed op
-     * If lookingForBrokers, find the op above the brokers scan
-     * Else find the suscbriptionsScan
+    /*This function is used to find specific operators within the plan, either
+     * A. The brokers dataset scan
+     * B. The subscriptions scan
+     * C. The highest project of the plan
      */
     private AbstractLogicalOperator findOp(AbstractLogicalOperator op, String lookingForString) {
         if (!op.hasInputs()) {
@@ -311,7 +377,7 @@
                 DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
                 if (dds.getDataset().getItemTypeDataverseName().equals("Metadata")
                         && dds.getDataset().getItemTypeName().equals("ChannelSubscriptionsType")) {
-                    if (dds.getDataset().getDatasetName().equals(subscriptionsName)) {
+                    if (subscriptionsName.equals("") || dds.getDataset().getDatasetName().equals(subscriptionsName)) {
                         return true;
                     }
                 }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
index d281b49..df0f0f4 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
@@ -20,6 +20,7 @@
 
 import java.util.Collection;
 
+import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDelegatedLogicalOperator;
@@ -27,22 +28,26 @@
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 
 /**
- * A repetitive channel operator, which uses a Java timer to run a given query periodically
+ * An operator for sending broker notifications
  */
 public class NotifyBrokerOperator extends AbstractDelegatedLogicalOperator {
-    private final LogicalVariable subscriptionIdVar;
     private final LogicalVariable brokerEndpointVar;
     private final LogicalVariable channelExecutionVar;
+    private final LogicalVariable pushListVar;
+    private final boolean push;
+    private final IAType recordType;
 
-    public NotifyBrokerOperator(LogicalVariable brokerEndpointVar, LogicalVariable subscriptionIdVar,
-            LogicalVariable resultSetVar) {
+    public NotifyBrokerOperator(LogicalVariable brokerEndpointVar, LogicalVariable pushListVar,
+            LogicalVariable resultSetVar, boolean push, IAType recordType) {
         this.brokerEndpointVar = brokerEndpointVar;
-        this.subscriptionIdVar = subscriptionIdVar;
         this.channelExecutionVar = resultSetVar;
+        this.pushListVar = pushListVar;
+        this.push = push;
+        this.recordType = recordType;
     }
 
-    public LogicalVariable getSubscriptionVariable() {
-        return subscriptionIdVar;
+    public LogicalVariable getPushListVar() {
+        return pushListVar;
     }
 
     public LogicalVariable getBrokerEndpointVariable() {
@@ -53,9 +58,18 @@
         return channelExecutionVar;
     }
 
+    public IAType getRecordType() {
+        return recordType;
+    }
+
+    public boolean getPush() {
+        return push;
+    }
+
     @Override
     public String toString() {
-        return "notify-brokers";
+        return "notify-brokers (" + brokerEndpointVar.toString() + "," + channelExecutionVar.toString() + ","
+                + pushListVar.toString() + ")";
     }
 
     @Override
@@ -65,7 +79,7 @@
 
     @Override
     public IOperatorDelegate newInstance() {
-        return new NotifyBrokerOperator(brokerEndpointVar, subscriptionIdVar, channelExecutionVar);
+        return new NotifyBrokerOperator(brokerEndpointVar, pushListVar, channelExecutionVar, push, recordType);
     }
 
     @Override
@@ -76,7 +90,7 @@
 
     @Override
     public void getUsedVariables(Collection<LogicalVariable> usedVars) {
-        usedVars.add(subscriptionIdVar);
+        usedVars.add(pushListVar);
         usedVars.add(brokerEndpointVar);
         usedVars.add(channelExecutionVar);
     }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
index 12d5ae2..b9cfbfd 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
@@ -20,6 +20,7 @@
 package org.apache.asterix.bad.runtime;
 
 import org.apache.asterix.active.EntityId;
+import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -74,20 +75,22 @@
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
                     throws AlgebricksException {
         DelegateOperator notify = (DelegateOperator) op;
-        LogicalVariable subVar = ((NotifyBrokerOperator) notify.getDelegate()).getSubscriptionVariable();
+        LogicalVariable pushListVar = ((NotifyBrokerOperator) notify.getDelegate()).getPushListVar();
         LogicalVariable brokerVar = ((NotifyBrokerOperator) notify.getDelegate()).getBrokerEndpointVariable();
         LogicalVariable executionVar = ((NotifyBrokerOperator) notify.getDelegate()).getChannelExecutionVariable();
+        IAType recordType = ((NotifyBrokerOperator) notify.getDelegate()).getRecordType();
+        boolean push = ((NotifyBrokerOperator) notify.getDelegate()).getPush();
 
         int brokerColumn = inputSchemas[0].findVariable(brokerVar);
-        int subColumn = inputSchemas[0].findVariable(subVar);
+        int pushColumn = inputSchemas[0].findVariable(pushListVar);
         int executionColumn = inputSchemas[0].findVariable(executionVar);
 
         IScalarEvaluatorFactory brokerEvalFactory = new ColumnAccessEvalFactory(brokerColumn);
-        IScalarEvaluatorFactory subEvalFactory = new ColumnAccessEvalFactory(subColumn);
+        IScalarEvaluatorFactory pushListEvalFactory = new ColumnAccessEvalFactory(pushColumn);
         IScalarEvaluatorFactory channelExecutionEvalFactory = new ColumnAccessEvalFactory(executionColumn);
 
-        NotifyBrokerRuntimeFactory runtime = new NotifyBrokerRuntimeFactory(brokerEvalFactory, subEvalFactory,
-                channelExecutionEvalFactory, entityId);
+        NotifyBrokerRuntimeFactory runtime = new NotifyBrokerRuntimeFactory(brokerEvalFactory, pushListEvalFactory,
+                channelExecutionEvalFactory, entityId, push, recordType);
 
         RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
                 context);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
index 5d51926..6ffb244 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -20,21 +20,33 @@
 package org.apache.asterix.bad.runtime;
 
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.active.EntityId;
-import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.bad.BADConstants;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
 import org.apache.asterix.om.base.ADateTime;
 import org.apache.asterix.om.base.AOrderedList;
-import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.base.ARecord;
+import org.apache.asterix.om.base.AUUID;
 import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
@@ -48,11 +60,13 @@
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 
 public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+    private static final Logger LOGGER = Logger.getLogger(NotifyBrokerRuntime.class.getName());
 
     private final ByteBufferInputStream bbis = new ByteBufferInputStream();
     private final DataInputStream di = new DataInputStream(bbis);
     private final AOrderedListSerializerDeserializer subSerDes =
             new AOrderedListSerializerDeserializer(new AOrderedListType(BuiltinType.AUUID, null));
+    private final ARecordSerializerDeserializer recordSerDes;
 
     private IPointable inputArg0 = new VoidPointable();
     private IPointable inputArg1 = new VoidPointable();
@@ -62,17 +76,29 @@
     private IScalarEvaluator eval2;
     private final ActiveManager activeManager;
     private final EntityId entityId;
+    private final boolean push;
+    private AOrderedList pushList;
+    private ARecord pushRecord;
+    private final IAType recordType;
+    private final Map<String, HashSet<String>> sendData = new HashMap<>();
+    private String executionTimeString;
 
     public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brokerEvalFactory,
-            IScalarEvaluatorFactory subEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
-            EntityId activeJobId) throws HyracksDataException {
+            IScalarEvaluatorFactory pushListEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
+            EntityId activeJobId, boolean push, IAType recordType) throws HyracksDataException {
         this.tRef = new FrameTupleReference();
         eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
-        eval1 = subEvalFactory.createScalarEvaluator(ctx);
+        eval1 = pushListEvalFactory.createScalarEvaluator(ctx);
         eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
         this.activeManager = (ActiveManager) ((INcApplicationContext) ctx.getJobletContext().getServiceContext()
                 .getApplicationContext()).getActiveManager();
         this.entityId = activeJobId;
+        this.push = push;
+        this.pushList = null;
+        this.pushRecord = null;
+        this.recordType = recordType;
+        recordSerDes = new ARecordSerializerDeserializer((ARecordType) recordType);
+        executionTimeString = null;
     }
 
     @Override
@@ -80,6 +106,61 @@
         return;
     }
 
+    private void addSubscriptions(String endpoint, AOrderedList subscriptionIds) {
+        for (int i = 0; i < subscriptionIds.size(); i++) {
+            AUUID subId = (AUUID) subscriptionIds.getItem(i);
+            String subscriptionString = subId.toString();
+            //Broker code currently cannot handle the "uuid {}" part of the string, so we parse just the value
+            subscriptionString = subscriptionString.substring(8, subscriptionString.length() - 2);
+            subscriptionString = "\"" + subscriptionString + "\"";
+            sendData.get(endpoint).add(subscriptionString);
+        }
+    }
+
+    public String createData(String endpoint) {
+        String JSON = "{ \"dataverseName\":\"" + entityId.getDataverse() + "\", \"channelName\":\""
+                + entityId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\""
+                + executionTimeString + "\", \"subscriptionIds\":[";
+        for (String value : sendData.get(endpoint)) {
+            JSON += value;
+            JSON += ",";
+        }
+        JSON = JSON.substring(0, JSON.length() - 1);
+        JSON += "]}";
+        return JSON;
+
+    }
+
+    private void sendGroupOfResults(String endpoint) {
+        String urlParameters = createData(endpoint);
+        try {
+            //Create connection
+            URL url = new URL(endpoint);
+            HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+            connection.setRequestMethod("POST");
+            connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
+
+            connection.setRequestProperty("Content-Length", Integer.toString(urlParameters.getBytes().length));
+            connection.setRequestProperty("Content-Language", "en-US");
+
+            connection.setUseCaches(false);
+            connection.setDoOutput(true);
+            connection.setConnectTimeout(500);
+            DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
+            wr.writeBytes(urlParameters);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                int responseCode = connection.getResponseCode();
+                LOGGER.info("\nSending 'POST' request to URL : " + url);
+                LOGGER.info("Post parameters : " + urlParameters);
+                LOGGER.info("Response Code : " + responseCode);
+            }
+            wr.close();
+            connection.disconnect();
+        } catch (Exception e) {
+            LOGGER.log(Level.WARNING, "Channel Failed to connect to Broker.");
+        }
+    }
+
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         tAccess.reset(buffer);
@@ -91,33 +172,47 @@
             eval1.evaluate(tRef, inputArg1);
             eval2.evaluate(tRef, inputArg2);
 
-            int serBrokerOffset = inputArg0.getStartOffset();
-            bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1);
-            AString endpoint = AStringSerializerDeserializer.INSTANCE.deserialize(di);
-
-            int serSubOffset = inputArg1.getStartOffset();
-            bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serSubOffset + 1);
-            AOrderedList subs = subSerDes.deserialize(di);
-
-            int resultSetOffset = inputArg2.getStartOffset();
-            bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1);
-            ADateTime executionTime = ADateTimeSerializerDeserializer.INSTANCE.deserialize(di);
-            String executionTimeString;
-            try {
-                executionTimeString = executionTime.toSimpleString();
-            } catch (IOException e) {
-                throw HyracksDataException.create(e);
+            if (executionTimeString == null) {
+                int resultSetOffset = inputArg2.getStartOffset();
+                bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1);
+                ADateTime executionTime = ADateTimeSerializerDeserializer.INSTANCE.deserialize(di);
+                try {
+                    executionTimeString = executionTime.toSimpleString();
+                } catch (IOException e) {
+                    throw HyracksDataException.create(e);
+                }
             }
 
-            ChannelJobService.sendBrokerNotificationsForChannel(entityId, endpoint.getStringValue(), subs,
-                    executionTimeString);
+            int serBrokerOffset = inputArg0.getStartOffset();
+            bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1);
+            String endpoint = AStringSerializerDeserializer.INSTANCE.deserialize(di).getStringValue();
+            sendData.putIfAbsent(endpoint, new HashSet<>());
 
+            if (push) {
+                int pushOffset = inputArg1.getStartOffset();
+                bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), pushOffset + 1);
+                //TODO: Right now this creates an object per channel result. Need to find a better way to deserialize
+                pushRecord = recordSerDes.deserialize(di);
+                sendData.get(endpoint).add(pushRecord.toString());
+
+            } else {
+                int serSubOffset = inputArg1.getStartOffset();
+                bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serSubOffset + 1);
+                pushList = subSerDes.deserialize(di);
+                addSubscriptions(endpoint, pushList);
+            }
         }
 
     }
 
     @Override
     public void close() throws HyracksDataException {
+        for (String endpoint : sendData.keySet()) {
+            if (sendData.get(endpoint).size() > 0) {
+                sendGroupOfResults(endpoint);
+                sendData.get(endpoint).clear();
+            }
+        }
         return;
     }
 
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
index 0e2be8b..a7f12ba 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
@@ -20,6 +20,7 @@
 package org.apache.asterix.bad.runtime;
 
 import org.apache.asterix.active.EntityId;
+import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -31,16 +32,21 @@
     private static final long serialVersionUID = 1L;
 
     private final IScalarEvaluatorFactory brokerEvalFactory;
-    private final IScalarEvaluatorFactory subEvalFactory;
+    private final IScalarEvaluatorFactory pushListEvalFactory;
     private final IScalarEvaluatorFactory channelExecutionEvalFactory;
     private final EntityId entityId;
+    private final boolean push;
+    private final IAType recordType;
 
-    public NotifyBrokerRuntimeFactory(IScalarEvaluatorFactory brokerEvalFactory, IScalarEvaluatorFactory subEvalFactory,
-            IScalarEvaluatorFactory channelExecutionEvalFactory, EntityId entityId) {
+    public NotifyBrokerRuntimeFactory(IScalarEvaluatorFactory brokerEvalFactory,
+            IScalarEvaluatorFactory pushListEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
+            EntityId entityId, boolean push, IAType recordType) {
         this.brokerEvalFactory = brokerEvalFactory;
-        this.subEvalFactory = subEvalFactory;
+        this.pushListEvalFactory = pushListEvalFactory;
         this.channelExecutionEvalFactory = channelExecutionEvalFactory;
         this.entityId = entityId;
+        this.push = push;
+        this.recordType = recordType;
     }
 
     @Override
@@ -50,7 +56,7 @@
 
     @Override
     public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
-        return new IPushRuntime[] { new NotifyBrokerRuntime(ctx, brokerEvalFactory, subEvalFactory,
-                channelExecutionEvalFactory, entityId) };
+        return new IPushRuntime[] { new NotifyBrokerRuntime(ctx, brokerEvalFactory, pushListEvalFactory,
+                channelExecutionEvalFactory, entityId, push, recordType) };
     }
 }
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt
index 02aba78..2d7ba75 100644
--- a/asterix-bad/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -101,15 +101,18 @@
   CreateChannelStatement ccs = null;
   String fqFunctionName = null;
   Expression period = null;
+  boolean push = false;
 }
 {
   (
-    "repetitive" "channel"  nameComponents = QualifiedName()
+    "repetitive"
+    ( "push" { push = true; } )?
+     "channel"  nameComponents = QualifiedName()
     <USING> appliedFunction = FunctionSignature()
     "period" period = FunctionCallExpr()
     {
       ccs = new CreateChannelStatement(nameComponents.first,
-                                   nameComponents.second, appliedFunction, period);
+                                   nameComponents.second, appliedFunction, period, push);
     }
   )
     {
diff --git a/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-push.sqlpp b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-push.sqlpp
new file mode 100644
index 0000000..e20638b
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-push.sqlpp
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Check a push-based channel plan
+ * Expected Res : Success
+ * Date         : Mar 2018
+ */
+
+drop dataverse channels7 if exists;
+create dataverse channels7;
+use channels7;
+
+create type UserLocation as {
+    location: circle,
+    userName: string,
+    timeStamp: datetime
+};
+
+
+create type UserLocationFeedType as {
+    location: circle,
+    userName: string
+};
+
+create type EmergencyReport as {
+    reportId: uuid,
+    Etype: string,
+    location: circle,
+    timeStamp: datetime
+};
+
+create type EmergencyReportFeedType as {
+    Etype: string,
+    location: circle
+};
+
+
+create type EmergencyShelter as {
+    shelterName: string,
+    location: point
+};
+
+create dataset UserLocations(UserLocation)
+primary key userName;
+create dataset Shelters(EmergencyShelter)
+primary key shelterName;
+create dataset Reports(EmergencyReport)
+primary key reportId autogenerated;
+
+create index location_time on UserLocations(timeStamp);
+create index u_location on UserLocations(location) type RTREE;
+create index s_location on Shelters(location) type RTREE;
+create index report_time on Reports(timeStamp);
+
+create function RecentEmergenciesNearUser(userName) {
+  (
+    select report, shelters from
+     ( select value r from Reports r where r.timeStamp >
+     current_datetime() - day_time_duration("PT10S"))report,
+    UserLocations u
+    let shelters = (select s.location from Shelters s where spatial_intersect(s.location,u.location))
+    where u.userName = userName
+    and spatial_intersect(report.location,u.location)
+  )
+};
+
+write output to nc1:"rttest/channel-push.sqlpp";
+
+create repetitive push channel EmergencyChannel using RecentEmergenciesNearUser@1 period duration("PT10S");
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan
new file mode 100644
index 0000000..770617f
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan
@@ -0,0 +1,64 @@
+-- NOTIFY_BROKERS  |PARTITIONED|
+  -- STREAM_PROJECT  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- STREAM_SELECT  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- HYBRID_HASH_JOIN [$$128][$$135]  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$128]  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- HYBRID_HASH_JOIN [$$126, $$124][$$117, $$118]  |PARTITIONED|
+                          -- HASH_PARTITION_EXCHANGE [$$126, $$124]  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ASSIGN  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                      -- ASSIGN  |UNPARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                          -- HASH_PARTITION_EXCHANGE [$$117, $$118]  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ASSIGN  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$135]  |PARTITIONED|
+                    -- NESTED_LOOP  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- STREAM_SELECT  |PARTITIONED|
+                          -- ASSIGN  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                      -- BROADCAST_EXCHANGE  |PARTITIONED|
+                        -- PRE_CLUSTERED_GROUP_BY[$$120]  |PARTITIONED|
+                                {
+                                  -- AGGREGATE  |LOCAL|
+                                    -- STREAM_SELECT  |LOCAL|
+                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                }
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- STABLE_SORT [$$120(ASC)]  |PARTITIONED|
+                              -- HASH_PARTITION_EXCHANGE [$$120]  |PARTITIONED|
+                                -- NESTED_LOOP  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file