Merge branch 'master' into results
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
index 0467f6e..db4fc2d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
@@ -19,7 +19,8 @@
 package org.apache.asterix.bad;
 
 public interface BADConstants {
-    String SubscriptionId = "subscriptionId";
+    String ChannelSubscriptionId = "channelSubId";
+    String BrokerSubscriptionId = "brokerSubId";
     String BrokerName = "BrokerName";
     String ChannelName = "ChannelName";
     String ProcedureName = "ProcedureName";
@@ -29,9 +30,11 @@
     String ResultId = "resultId";
     String ChannelExecutionTime = "channelExecutionTime";
     String ChannelSubscriptionsType = "ChannelSubscriptionsType";
+    String BrokerSubscriptionsType = "BrokerSubscriptionsType";
     String ChannelResultsType = "ChannelResultsType";
     String ResultsDatasetName = "ResultsDatasetName";
-    String SubscriptionsDatasetName = "SubscriptionsDatasetName";
+    String ChannelSubscriptionsDatasetName = "ChannelSubscriptionsDatasetName";
+    String BrokerSubscriptionsDatasetName = "BrokerSubscriptionsDatasetName";
     String CHANNEL_EXTENSION_NAME = "Channel";
     String PROCEDURE_KEYWORD = "Procedure";
     String BROKER_KEYWORD = "Broker";
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 8403fcc..5d9d971 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -142,7 +142,11 @@
             ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
             tempMdProvider.getLocks().reset();
             dropStmt = new DropDatasetStatement(new Identifier(dataverse),
-                    new Identifier(channel.getSubscriptionsDataset()), true);
+                    new Identifier(channel.getChannelSubscriptionsDataset()), true);
+            ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
+            tempMdProvider.getLocks().reset();
+            dropStmt = new DropDatasetStatement(new Identifier(dataverse),
+                    new Identifier(channel.getBrokerSubscriptionsDataset()), true);
             ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index 7583f0b..5803f60 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -138,7 +138,7 @@
                 throw new AsterixException("There is no broker with this name " + brokerName + ".");
             }
 
-            String subscriptionsDatasetName = channel.getSubscriptionsDataset();
+            String subscriptionsDatasetName = channel.getChannelSubscriptionsDataset();
 
             if (argList.size() != channel.getFunction().getArity()) {
                 throw new AsterixException("Channel expected " + channel.getFunction().getArity()
@@ -157,7 +157,7 @@
             fb.add(new FieldBinding(leftExpr, rightExpr));
 
             if (subscriptionId != null) {
-                leftExpr = new LiteralExpr(new StringLiteral(BADConstants.SubscriptionId));
+                leftExpr = new LiteralExpr(new StringLiteral(BADConstants.ChannelSubscriptionId));
 
                 List<Expression> UUIDList = new ArrayList<>();
                 UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
@@ -190,7 +190,8 @@
                 VariableExpr resultVar = new VariableExpr(new VarIdentifier("$result", 0));
                 VariableExpr useResultVar = new VariableExpr(new VarIdentifier("$result", 0));
                 useResultVar.setIsNewVar(false);
-                FieldAccessor accessor = new FieldAccessor(useResultVar, new Identifier(BADConstants.SubscriptionId));
+                FieldAccessor accessor =
+                        new FieldAccessor(useResultVar, new Identifier(BADConstants.ChannelSubscriptionId));
 
                 metadataProvider.setResultSetId(new ResultSetId(resultSetId));
                 boolean resultsAsync =
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index b23bf3b..e6ba69c 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -113,11 +113,11 @@
                 throw new AsterixException("There is no channel with this name " + channelName + ".");
             }
 
-            String subscriptionsDatasetName = channel.getSubscriptionsDataset();
+            String subscriptionsDatasetName = channel.getChannelSubscriptionsDataset();
 
             //Need a condition to say subscription-id = sid
             OperatorExpr condition = new OperatorExpr();
-            FieldAccessor fa = new FieldAccessor(vars, new Identifier(BADConstants.SubscriptionId));
+            FieldAccessor fa = new FieldAccessor(vars, new Identifier(BADConstants.ChannelSubscriptionId));
             condition.addOperand(fa);
             condition.setCurrentop(true);
             condition.addOperator("=");
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index a28666a..b2ffa1b 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -85,7 +85,8 @@
     private Identifier dataverseName;
     private String duration;
     private String body;
-    private String subscriptionsTableName;
+    private String channelSubscriptionsTableName;
+    private String brokerSubscriptionsTableName;
     private String resultsTableName;
     private String dataverse;
     private final boolean push;
@@ -113,7 +114,7 @@
     }
 
     public String getSubscriptionsName() {
-        return subscriptionsTableName;
+        return channelSubscriptionsTableName;
     }
 
     public String getDuration() {
@@ -159,29 +160,53 @@
     private void createDatasets(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc) throws AsterixException, Exception {
 
+        //Create channel subscriptions dataset
         Identifier subscriptionsTypeName = new Identifier(BADConstants.ChannelSubscriptionsType);
         Identifier resultsTypeName = new Identifier(BADConstants.ChannelResultsType);
         //Setup the subscriptions dataset
         List<List<String>> partitionFields = new ArrayList<>();
         List<Integer> keyIndicators = new ArrayList<>();
         keyIndicators.add(0);
-        List<String> fieldNames = new ArrayList<>();
-        fieldNames.add(BADConstants.SubscriptionId);
-        partitionFields.add(fieldNames);
-        IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null);
-        DatasetDecl createSubscriptionsDataset = new DatasetDecl(dataverseName, new Identifier(subscriptionsTableName),
-                new Identifier(BADConstants.BAD_DATAVERSE_NAME), subscriptionsTypeName, null, null, null,
-                new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true);
+        List<String> fieldName = new ArrayList<>();
+        fieldName.add(BADConstants.ChannelSubscriptionId);
+        partitionFields.add(fieldName);
+        IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, false, null);
+        DatasetDecl createChannelSubscriptionsDataset =
+                new DatasetDecl(dataverseName, new Identifier(channelSubscriptionsTableName),
+                        new Identifier(BADConstants.BAD_DATAVERSE_NAME), subscriptionsTypeName, null, null, null,
+                        new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true);
 
-        ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
+        ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider,
+                createChannelSubscriptionsDataset, hcc, null);
+
+        //Create broker subscriptions dataset
+        Identifier brokerSubscriptionsTypeName = new Identifier(BADConstants.BrokerSubscriptionsType);
+        partitionFields = new ArrayList<>();
+        keyIndicators = new ArrayList<>();
+        keyIndicators.add(0);
+        keyIndicators.add(0);
+        fieldName = new ArrayList<>();
+        List<String> fieldName2 = new ArrayList<>();
+        fieldName.add(BADConstants.ChannelSubscriptionId);
+        fieldName2.add(BADConstants.BrokerSubscriptionId);
+        partitionFields.add(fieldName);
+        partitionFields.add(fieldName2);
+        idd = new InternalDetailsDecl(partitionFields, keyIndicators, false, null);
+        DatasetDecl createBrokerSubscriptionsDataset =
+                new DatasetDecl(dataverseName, new Identifier(brokerSubscriptionsTableName),
+                        new Identifier(BADConstants.BAD_DATAVERSE_NAME), brokerSubscriptionsTypeName, null, null, null,
+                        new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true);
+        metadataProvider.getLocks().reset();
+        ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider,
+                createBrokerSubscriptionsDataset,
                 hcc, null);
 
         if (!push) {
             //Setup the results dataset
             partitionFields = new ArrayList<>();
-            fieldNames = new ArrayList<>();
-            fieldNames.add(BADConstants.ResultId);
-            partitionFields.add(fieldNames);
+            fieldName = new ArrayList<>();
+            fieldName.add(BADConstants.ResultId);
+            partitionFields.add(fieldName);
             idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null);
             DatasetDecl createResultsDataset = new DatasetDecl(dataverseName, new Identifier(resultsTableName),
                     new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, new HashMap<>(),
@@ -225,9 +250,10 @@
         builder.append("with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n");
         builder.append("select result, ");
         builder.append(BADConstants.ChannelExecutionTime + ", ");
-        builder.append("sub." + BADConstants.SubscriptionId + " as " + BADConstants.SubscriptionId + ",");
+        builder.append("sub." + BADConstants.ChannelSubscriptionId + " as " + BADConstants.ChannelSubscriptionId + ",");
         builder.append("current_datetime() as " + BADConstants.DeliveryTime + "\n");
-        builder.append("from " + dataverse + "." + subscriptionsTableName + " sub,\n");
+        builder.append("from " + dataverse + "." + channelSubscriptionsTableName + " sub,\n");
+        builder.append(dataverse + "." + brokerSubscriptionsTableName + " bs,\n");
         builder.append(BADConstants.BAD_DATAVERSE_NAME + "." + BADConstants.BROKER_KEYWORD + " b, \n");
         builder.append(function.getNamespace() + "." + function.getName() + "(");
         int i = 0;
@@ -235,8 +261,6 @@
             builder.append("sub.param" + i + ",");
         }
         builder.append("sub.param" + i + ") result \n");
-        builder.append("where b." + BADConstants.BrokerName + " = sub." + BADConstants.BrokerName + "\n");
-        builder.append("and b." + BADConstants.DataverseName + " = sub." + BADConstants.DataverseName + "\n");
         if (!push) {
             builder.append(")");
             builder.append(" returning a");
@@ -269,7 +293,9 @@
 
         dataverseName = new Identifier(((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName));
         dataverse = dataverseName.getValue();
-        subscriptionsTableName = channelName + BADConstants.subscriptionEnding;
+        channelSubscriptionsTableName =
+                channelName + BADConstants.CHANNEL_EXTENSION_NAME + BADConstants.subscriptionEnding;
+        brokerSubscriptionsTableName = channelName + BADConstants.BROKER_KEYWORD + BADConstants.subscriptionEnding;
         resultsTableName = push ? "" : channelName + BADConstants.resultsEnding;
 
         EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
@@ -297,7 +323,10 @@
             initialize(mdTxnCtx);
 
             //check if names are available before creating anything
-            if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsTableName) != null) {
+            if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, channelSubscriptionsTableName) != null) {
+                throw new AsterixException("The channel name:" + channelName + " is not available.");
+            }
+            if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, brokerSubscriptionsTableName) != null) {
                 throw new AsterixException("The channel name:" + channelName + " is not available.");
             }
             if (!push && MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsTableName) != null) {
@@ -323,7 +352,8 @@
 
             BADJobService.setupExecutorJob(entityId, channeljobSpec, hcc, listener, metadataProvider.getTxnIdFactory(),
                     duration);
-            channel = new Channel(dataverse, channelName.getValue(), subscriptionsTableName, resultsTableName, function,
+            channel = new Channel(dataverse, channelName.getValue(), channelSubscriptionsTableName,
+                    brokerSubscriptionsTableName, resultsTableName, function,
                     duration, null, body);
 
             MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
index cd2ff86..5be2e84 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
@@ -47,8 +47,12 @@
     public static final Dataverse BAD_DATAVERSE = new Dataverse(BADConstants.BAD_DATAVERSE_NAME,
             NonTaggedDataFormat.class.getName(), MetadataUtil.PENDING_NO_OP);
 
-    public static final Datatype BAD_SUBSCRIPTION_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+    public static final Datatype BAD_CHANNEL_SUBSCRIPTION_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
             BADConstants.ChannelSubscriptionsType, BADMetadataRecordTypes.channelSubscriptionsType, false);
+
+    public static final Datatype BAD_BROKER_SUBSCRIPTION_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+            BADConstants.BrokerSubscriptionsType, BADMetadataRecordTypes.brokerSubscriptionsType, false);
+
     public static final Datatype BAD_RESULT_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
             BADConstants.ChannelResultsType, BADMetadataRecordTypes.channelResultsType, false);
 
@@ -76,7 +80,6 @@
         return new MetadataTupleTranslatorProvider();
     }
 
-    @SuppressWarnings("rawtypes")
     @Override
     public List<ExtensionMetadataDataset> getExtensionIndexes() {
         try {
@@ -107,7 +110,8 @@
                 // MetadataManager.INSTANCE.addDataverse(mdTxnCtx, BAD_DATAVERSE);
                 // insert default data type
                 MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_RESULT_DATATYPE);
-                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_SUBSCRIPTION_DATATYPE);
+                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_CHANNEL_SUBSCRIPTION_DATATYPE);
+                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_BROKER_SUBSCRIPTION_DATATYPE);
                 MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_BROKER_DATATYPE);
                 MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_CHANNEL_DATATYPE);
                 MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_PROCEDURE_DATATYPE);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
index a764a5a..f6e2e1f 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
@@ -27,16 +27,24 @@
 
 public class BADMetadataRecordTypes {
 
-    // -------------------------------------- Subscriptions --------------------------------------//
-    private static final String[] subTypeFieldNames =
-            { BADConstants.DataverseName, BADConstants.BrokerName, BADConstants.SubscriptionId };
-    private static final IAType[] subTypeFieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AUUID };
+    // -------------------------------------- Channel Subscriptions --------------------------------------//
+    private static final String[] channelSubTypeFieldNames = { BADConstants.ChannelSubscriptionId };
+    private static final IAType[] channelSubTypeFieldTypes = { BuiltinType.AUUID };
     public static final ARecordType channelSubscriptionsType =
-            new ARecordType(BADConstants.ChannelSubscriptionsType, subTypeFieldNames, subTypeFieldTypes, true);
+            new ARecordType(BADConstants.ChannelSubscriptionsType, channelSubTypeFieldNames, channelSubTypeFieldTypes,
+                    true);
+
+    // -------------------------------------- Broker Subscriptions --------------------------------------//
+    private static final String[] brokerSubTypeFieldNames = { BADConstants.ChannelSubscriptionId,
+            BADConstants.BrokerSubscriptionId, BADConstants.DataverseName, BADConstants.BrokerName };
+    private static final IAType[] brokerSubTypeFieldTypes =
+            { BuiltinType.AUUID, BuiltinType.AUUID, BuiltinType.ASTRING, BuiltinType.ASTRING };
+    public static final ARecordType brokerSubscriptionsType = new ARecordType(BADConstants.BrokerSubscriptionsType,
+            brokerSubTypeFieldNames, brokerSubTypeFieldTypes, true);
 
     // ---------------------------------------- Results --------------------------------------------//
     private static final String[] resultTypeFieldNames = { BADConstants.ResultId, BADConstants.ChannelExecutionTime,
-            BADConstants.SubscriptionId, BADConstants.DeliveryTime };
+            BADConstants.ChannelSubscriptionId, BADConstants.DeliveryTime };
     private static final IAType[] resultTypeFieldTypes =
             { BuiltinType.AUUID, BuiltinType.ADATETIME, BuiltinType.AUUID, BuiltinType.ADATETIME };
     public static final ARecordType channelResultsType =
@@ -45,25 +53,28 @@
     //------------------------------------------ Channel ----------------------------------------//     
     public static final int CHANNEL_ARECORD_DATAVERSE_NAME_FIELD_INDEX = 0;
     public static final int CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX = 1;
-    public static final int CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX = 2;
+    public static final int CHANNEL_ARECORD_CHANNEL_SUBSCRIPTIONS_NAME_FIELD_INDEX = 2;
     public static final int CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX = 3;
     public static final int CHANNEL_ARECORD_FUNCTION_FIELD_INDEX = 4;
     public static final int CHANNEL_ARECORD_DURATION_FIELD_INDEX = 5;
     public static final int CHANNEL_ARECORD_DEPENDENCIES_FIELD_INDEX = 6;
     public static final int CHANNEL_ARECORD_BODY_FIELD_INDEX = 7;
+    public static final int CHANNEL_ARECORD_BROKER_SUBSCRIPTIONS_NAME_FIELD_INDEX = 8;
     public static final ARecordType CHANNEL_RECORDTYPE = MetadataRecordTypes.createRecordType(
             // RecordTypeName
             BADConstants.RECORD_TYPENAME_CHANNEL,
             // FieldNames
-            new String[] { BADConstants.DataverseName, BADConstants.ChannelName, BADConstants.SubscriptionsDatasetName,
+            new String[] { BADConstants.DataverseName, BADConstants.ChannelName,
+                    BADConstants.ChannelSubscriptionsDatasetName,
                     BADConstants.ResultsDatasetName, BADConstants.Function, BADConstants.Duration,
-                    BADConstants.FIELD_NAME_DEPENDENCIES, BADConstants.FIELD_NAME_BODY },
+                    BADConstants.FIELD_NAME_DEPENDENCIES, BADConstants.FIELD_NAME_BODY,
+                    BADConstants.BrokerSubscriptionsDatasetName },
             // FieldTypes
             new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
                     new AOrderedListType(BuiltinType.ASTRING, null), BuiltinType.ASTRING,
                     new AOrderedListType(new AOrderedListType(new AOrderedListType(BuiltinType.ASTRING, null), null),
                             null),
-                    BuiltinType.ASTRING },
+                    BuiltinType.ASTRING, BuiltinType.ASTRING },
             //IsOpen?
             true);
     //------------------------------------------ Broker ----------------------------------------//
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
index ed9346c..61df9af 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
@@ -34,7 +34,8 @@
 
     /** A unique identifier for the channel */
     protected final EntityId channelId;
-    private final String subscriptionsDatasetName;
+    private final String channelSubscriptionsDatasetName;
+    private final String brokerSubscriptionsDatasetName;
     private final String resultsDatasetName;
     private final String duration;
     private final String channelBody;
@@ -49,13 +50,15 @@
     */
     private final List<List<List<String>>> dependencies;
 
-    public Channel(String dataverseName, String channelName, String subscriptionsDataset, String resultsDataset,
+    public Channel(String dataverseName, String channelName, String channelSubscriptionsDatasetName,
+            String brokerSubscriptionsDatasetName, String resultsDataset,
             FunctionSignature function, String duration, List<List<List<String>>> dependencies, String channelBody) {
         this.channelId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName);
         this.function = function;
         this.duration = duration;
         this.resultsDatasetName = resultsDataset;
-        this.subscriptionsDatasetName = subscriptionsDataset;
+        this.channelSubscriptionsDatasetName = channelSubscriptionsDatasetName;
+        this.brokerSubscriptionsDatasetName = brokerSubscriptionsDatasetName;
         this.channelBody = channelBody;
         if (this.function.getNamespace() == null) {
             this.function.setNamespace(dataverseName);
@@ -67,7 +70,7 @@
             this.dependencies.add(new ArrayList<>());
             this.dependencies.add(new ArrayList<>());
             List<String> resultsList = Arrays.asList(dataverseName, resultsDatasetName);
-            List<String> subscriptionList = Arrays.asList(dataverseName, subscriptionsDatasetName);
+            List<String> subscriptionList = Arrays.asList(dataverseName, channelSubscriptionsDatasetName);
             this.dependencies.get(0).add(resultsList);
             this.dependencies.get(0).add(subscriptionList);
             this.dependencies.get(1).add(functionAsPath);
@@ -84,8 +87,12 @@
         return dependencies;
     }
 
-    public String getSubscriptionsDataset() {
-        return subscriptionsDatasetName;
+    public String getChannelSubscriptionsDataset() {
+        return channelSubscriptionsDatasetName;
+    }
+
+    public String getBrokerSubscriptionsDataset() {
+        return brokerSubscriptionsDatasetName;
     }
 
     public String getResultsDatasetName() {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
index 175280e..6cf7f50 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
@@ -82,8 +82,9 @@
         String channelName =
                 ((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX))
                         .getStringValue();
-        String subscriptionsName = ((AString) channelRecord
-                .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX)).getStringValue();
+        String channelSubscriptionsName = ((AString) channelRecord
+                .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_SUBSCRIPTIONS_NAME_FIELD_INDEX))
+                        .getStringValue();
         String resultsName =
                 ((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX))
                         .getStringValue();
@@ -127,10 +128,15 @@
                 ((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_BODY_FIELD_INDEX))
                         .getStringValue();
 
+        String brokerSubscriptionsName = ((AString) channelRecord
+                .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_BROKER_SUBSCRIPTIONS_NAME_FIELD_INDEX))
+                        .getStringValue();
+
         FunctionSignature signature = new FunctionSignature(functionSignature.get(0), functionSignature.get(1),
                 Integer.parseInt(functionSignature.get(2)));
 
-        channel = new Channel(dataverseName, channelName, subscriptionsName, resultsName, signature, duration,
+        channel = new Channel(dataverseName, channelName, channelSubscriptionsName, brokerSubscriptionsName,
+                resultsName, signature, duration,
                 dependencies, channelBody);
         return channel;
     }
@@ -164,9 +170,10 @@
 
         // write field 2
         fieldValue.reset();
-        aString.setValue(channel.getSubscriptionsDataset());
+        aString.setValue(channel.getChannelSubscriptionsDataset());
         stringSerde.serialize(aString, fieldValue.getDataOutput());
-        recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX, fieldValue);
+        recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_SUBSCRIPTIONS_NAME_FIELD_INDEX,
+                fieldValue);
 
         // write field 3
         fieldValue.reset();
@@ -227,6 +234,13 @@
         stringSerde.serialize(aString, fieldValue.getDataOutput());
         recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_BODY_FIELD_INDEX, fieldValue);
 
+        // write field 8
+        fieldValue.reset();
+        aString.setValue(channel.getBrokerSubscriptionsDataset());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_BROKER_SUBSCRIPTIONS_NAME_FIELD_INDEX,
+                fieldValue);
+
         // write record
         recordBuilder.write(tupleBuilder.getDataOutput(), true);
 
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 9ead7f0..4ec6d2f 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -28,9 +28,11 @@
 import org.apache.asterix.bad.runtime.NotifyBrokerPOperator;
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.declared.DatasetDataSource;
+import org.apache.asterix.om.base.AInt32;
 import org.apache.asterix.om.base.AString;
 import org.apache.asterix.om.constants.AsterixConstantValue;
 import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.FunctionInfo;
 import org.apache.asterix.om.types.IAType;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -47,6 +49,7 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
@@ -58,6 +61,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
@@ -76,25 +80,19 @@
         if (op1.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
             return false;
         }
-        boolean push = false;
 
-        AbstractLogicalOperator op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
-        if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
-            if (op.getOperatorTag() != LogicalOperatorTag.PROJECT) {
-                return false;
-            }
+        boolean push = false;
+        AbstractLogicalOperator op = findOp(op1, 4, "", "");
+        if (op == null) {
             push = true;
         }
+
         DataSourceScanOperator subscriptionsScan;
         String channelDataverse;
         String channelName;
 
         if (!push) {
-            DelegateOperator eOp = (DelegateOperator) op;
-            if (!(eOp.getDelegate() instanceof CommitOperator)) {
-                return false;
-            }
-            AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
+            AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
             if (descendantOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
                 return false;
             }
@@ -113,15 +111,20 @@
             //Now we know that we are inserting into results
 
             channelName = datasetName.substring(0, datasetName.length() - 7);
-            String subscriptionsName = channelName + "Subscriptions";
-            subscriptionsScan = (DataSourceScanOperator) findOp(op, subscriptionsName);
+            String subscriptionsName = channelName + "ChannelSubscriptions";
+            subscriptionsScan =
+                    (DataSourceScanOperator) findOp(op, 3, subscriptionsName, BADConstants.ChannelSubscriptionsType);
             if (subscriptionsScan == null) {
                 return false;
             }
 
         } else {
+            op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
+            if (op.getOperatorTag() != LogicalOperatorTag.PROJECT) {
+                return false;
+            }
             //if push, get the channel name here instead
-            subscriptionsScan = (DataSourceScanOperator) findOp(op, "");
+            subscriptionsScan = (DataSourceScanOperator) findOp(op, 3, "", BADConstants.ChannelSubscriptionsType);
             if (subscriptionsScan == null) {
                 return false;
             }
@@ -131,15 +134,8 @@
             channelName = datasetName.substring(0, datasetName.length() - 13);
         }
 
-        //Now we need to get the broker EndPoint
-        LogicalVariable brokerEndpointVar = context.newVar();
-        AbstractLogicalOperator opAboveBrokersScan = findOp(op, "brokers");
-        if (opAboveBrokersScan == null) {
-            return false;
-        }
-
-        //get subscriptionIdVar
-        LogicalVariable subscriptionIdVar = subscriptionsScan.getVariables().get(0);
+        //get channelSubscriptionIdVar
+        LogicalVariable channelSubscriptionIdVar = subscriptionsScan.getVariables().get(0);
 
         //The channelExecutionTime is created just before the scan
         ILogicalOperator channelExecutionAssign = subscriptionsScan.getInputs().get(0).getValue();
@@ -155,25 +151,48 @@
             ((CommitOperator) ((DelegateOperator) op).getDelegate()).setSink(false);
         }
 
-        AssignOperator assignOp = createbrokerEndPointAssignOperator(brokerEndpointVar, opAboveBrokersScan);
-        //now brokerNameVar holds the brokerName for use farther up in the plan
+        //move broker scan
+        AbstractLogicalOperator opAboveBrokersScan = findOp(op, 1, "", "");
+        if (opAboveBrokersScan == null) {
+            return false;
+        }
+        DataSourceScanOperator brokerScan =
+                moveScans(opAboveBrokersScan, op1, context, channelName + "BrokerSubscriptions");
 
-        context.computeAndSetTypeEnvironmentForOperator(assignOp);
-        context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan);
-        context.computeAndSetTypeEnvironmentForOperator(op);
+        if (brokerScan == null) {
+            return false;
+        }
+        DataSourceScanOperator brokerSubscriptionScan =
+                (DataSourceScanOperator) brokerScan.getInputs().get(0).getValue();
 
-        ProjectOperator badProject = (ProjectOperator) findOp(op1, "project");
-        badProject.getVariables().add(subscriptionIdVar);
-        badProject.getVariables().add(brokerEndpointVar);
+        //Add select to join subscriptions and broker and assign to get endpoint
+        LogicalVariable brokerDataverseVar = brokerScan.getVariables().get(0);
+        LogicalVariable brokerNameVar = brokerScan.getVariables().get(1);
+        LogicalVariable brokerVar = brokerScan.getVariables().get(2);
+        LogicalVariable brokerSubscriptionVar = brokerSubscriptionScan.getVariables().get(2);
+        LogicalVariable brokerSubscriptionIdVar = brokerSubscriptionScan.getVariables().get(1);
+        LogicalVariable brokerSubscriptionChannelIdVar = brokerSubscriptionScan.getVariables().get(0);
+
+        LogicalVariable brokerEndpointVar = context.newVar();
+        AssignOperator assign = createAssignAndSelect(brokerDataverseVar, brokerNameVar, brokerVar,
+                brokerSubscriptionVar, brokerSubscriptionIdVar, brokerEndpointVar, channelSubscriptionIdVar,
+                brokerSubscriptionChannelIdVar, context, op1);
+
+        context.computeAndSetTypeEnvironmentForOperator(op1);
+
+        //Maybe we need to add a project???
+        ProjectOperator badProject = (ProjectOperator) findOp(op1, 2, "", "");
         badProject.getVariables().add(channelExecutionVar);
+        badProject.getVariables().add(channelSubscriptionIdVar);
         context.computeAndSetTypeEnvironmentForOperator(badProject);
 
 
         //Create my brokerNotify plan above the extension Operator
         DelegateOperator dOp = push
                 ? createNotifyBrokerPushPlan(brokerEndpointVar, badProject.getVariables().get(0), channelExecutionVar,
-                        context, op, (DistributeResultOperator) op1, channelDataverse, channelName)
-                : createNotifyBrokerPullPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar, context, op,
+                        context, assign, (DistributeResultOperator) op1, channelDataverse, channelName)
+                : createNotifyBrokerPullPlan(brokerEndpointVar, channelSubscriptionIdVar, channelExecutionVar, context,
+                        assign,
                         (DistributeResultOperator) op1, channelDataverse, channelName);
 
         opRef.setValue(dOp);
@@ -181,6 +200,98 @@
         return true;
     }
 
+    private AssignOperator createAssignAndSelect(LogicalVariable brokerDataverseVar, LogicalVariable brokerNameVar,
+            LogicalVariable brokerVar, LogicalVariable brokerSubscriptionVar, LogicalVariable brokerSubscriptionIdVar,
+            LogicalVariable brokerEndpointVar, LogicalVariable channelSubscriptionIdVar,
+            LogicalVariable brokerSubscriptionChannelIdVar, IOptimizationContext context, AbstractLogicalOperator op1)
+            throws AlgebricksException {
+
+        FunctionInfo finfoGetField =
+                (FunctionInfo) FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_INDEX);
+        FunctionInfo finfoGetEquality = (FunctionInfo) FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.EQ);
+        FunctionInfo finfoGetAnd = (FunctionInfo) FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.AND);
+
+        //Create Select Operator
+        //The operator matches (A) the broker dataverse and name between broker subscriptions and brokers
+        // and (B) the channel subscription Id between broker subscriptions and channel subscriptions
+        ScalarFunctionCallExpression getBrokerName = new ScalarFunctionCallExpression(finfoGetField,
+                new MutableObject<>(new VariableReferenceExpression(brokerSubscriptionVar)),
+                new MutableObject<>(new ConstantExpression(new AsterixConstantValue(new AInt32(3)))));
+        ScalarFunctionCallExpression getBrokerDataverse = new ScalarFunctionCallExpression(finfoGetField,
+                new MutableObject<>(new VariableReferenceExpression(brokerSubscriptionVar)),
+                new MutableObject<>(new ConstantExpression(new AsterixConstantValue(new AInt32(2)))));
+
+        VariableReferenceExpression BrokerBrokerNameReference = new VariableReferenceExpression(brokerNameVar);
+        VariableReferenceExpression BrokerBrokerDataverseReference =
+                new VariableReferenceExpression(brokerDataverseVar);
+        VariableReferenceExpression brokerSubscriptionChannelIdVarReference =
+                new VariableReferenceExpression(brokerSubscriptionChannelIdVar);
+
+        ScalarFunctionCallExpression brokerNameCheck = new ScalarFunctionCallExpression(finfoGetEquality,
+                new MutableObject<>(getBrokerName), new MutableObject<>(BrokerBrokerNameReference));
+        ScalarFunctionCallExpression brokerDataverseCheck = new ScalarFunctionCallExpression(finfoGetEquality,
+                new MutableObject<>(getBrokerDataverse), new MutableObject<>(BrokerBrokerDataverseReference));
+        ScalarFunctionCallExpression channelSubCheck = new ScalarFunctionCallExpression(finfoGetEquality,
+                new MutableObject<>(brokerSubscriptionChannelIdVarReference),
+                new MutableObject<>(new VariableReferenceExpression(channelSubscriptionIdVar)));
+
+        ScalarFunctionCallExpression andExpression =
+                new ScalarFunctionCallExpression(finfoGetAnd, new MutableObject<>(channelSubCheck),
+                        new MutableObject<>(brokerDataverseCheck), new MutableObject<>(brokerNameCheck));
+
+        SelectOperator select = new SelectOperator(new MutableObject<>(andExpression), false, null);
+        select.getInputs().addAll(op1.getInputs());
+
+        //Create Assign Operator
+        ScalarFunctionCallExpression getEndPoint = new ScalarFunctionCallExpression(finfoGetField,
+                new MutableObject<>(new VariableReferenceExpression(brokerVar)),
+                new MutableObject<>(new ConstantExpression(new AsterixConstantValue(new AInt32(2)))));
+        AssignOperator assign = new AssignOperator(brokerEndpointVar, new MutableObject<>(getEndPoint));
+        assign.getInputs().add(new MutableObject<>(select));
+
+        op1.getInputs().set(0, new MutableObject<>(assign));
+        context.computeAndSetTypeEnvironmentForOperator(select);
+        context.computeAndSetTypeEnvironmentForOperator(assign);
+        context.computeAndSetTypeEnvironmentForOperator(op1);
+
+        return assign;
+
+    }
+
+    private DataSourceScanOperator moveScans(AbstractLogicalOperator opAboveScan, AbstractLogicalOperator op1,
+            IOptimizationContext context, String subscriptionsName) throws AlgebricksException {
+
+        DataSourceScanOperator brokerScan = null;
+        int i = 0;
+        for (Mutable<ILogicalOperator> subOp : opAboveScan.getInputs()) {
+            if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
+                brokerScan = (DataSourceScanOperator) subOp.getValue();
+                break;
+            }
+            i++;
+        }
+        if (brokerScan == null) {
+            return null;
+        }
+
+        AbstractLogicalOperator brokerSubcriptionsScan =
+                (AbstractLogicalOperator) brokerScan.getInputs().get(0).getValue();
+
+        if (!isSubscriptionsScan(brokerSubcriptionsScan, subscriptionsName, BADConstants.BrokerSubscriptionsType)) {
+            return null;
+        }
+        opAboveScan.getInputs().set(i, brokerSubcriptionsScan.getInputs().get(0));
+        context.computeAndSetTypeEnvironmentForOperator(opAboveScan);
+
+        brokerSubcriptionsScan.getInputs().set(0, op1.getInputs().get(0));
+        op1.getInputs().set(0, new MutableObject<>(brokerScan));
+        context.computeAndSetTypeEnvironmentForOperator(brokerSubcriptionsScan);
+        context.computeAndSetTypeEnvironmentForOperator(brokerScan);
+        context.computeAndSetTypeEnvironmentForOperator(op1);
+        return brokerScan;
+
+    }
+
     private DelegateOperator createBrokerOp(LogicalVariable brokerEndpointVar, LogicalVariable sendVar,
             LogicalVariable channelExecutionVar, String channelDataverse, String channelName, boolean push,
             IAType resultType) {
@@ -310,49 +421,66 @@
     }
 
     /*This function is used to find specific operators within the plan, either
-     * A. The brokers dataset scan
-     * B. The subscriptions scan
-     * C. The highest project of the plan
+     * 1. The brokers dataset scan
+     * 2. The highest project of the plan
+     * 3. The subscriptions scan
+     * 4. Commit operator
+     *
+     * param1 is the name of the expected subscriptions dataset when searching for a subscriptions scan
+     * and param2 is the type of the subscriptions dataset (channel or broker)
      */
-    private AbstractLogicalOperator findOp(AbstractLogicalOperator op, String lookingForString) {
+    private AbstractLogicalOperator findOp(AbstractLogicalOperator op, int searchId, String param1, String param2) {
         if (!op.hasInputs()) {
             return null;
         }
         for (Mutable<ILogicalOperator> subOp : op.getInputs()) {
-            if (lookingForString.equals("brokers")) {
+            if (searchId == 1) {
                 if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
                     return op;
                 } else {
                     AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
-                            lookingForString);
+                            searchId, param1, param2);
                     if (nestedOp != null) {
                         return nestedOp;
                     }
                 }
 
-            } else if (lookingForString.equals("project")) {
+            } else if (searchId == 2) {
                 if (subOp.getValue().getOperatorTag() == LogicalOperatorTag.PROJECT) {
                     return (AbstractLogicalOperator) subOp.getValue();
                 } else {
                     AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
-                            lookingForString);
+                            searchId, param1, param2);
                     if (nestedOp != null) {
                         return nestedOp;
                     }
                 }
             }
 
-            else {
-                if (isSubscriptionsScan((AbstractLogicalOperator) subOp.getValue(), lookingForString)) {
+            else if (searchId == 3) {
+                if (isSubscriptionsScan((AbstractLogicalOperator) subOp.getValue(), param1, param2)) {
                     return (AbstractLogicalOperator) subOp.getValue();
                 } else {
                     AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
-                            lookingForString);
+                            searchId, param1, param2);
                     if (nestedOp != null) {
                         return nestedOp;
                     }
                 }
 
+            } else if (searchId == 4) {
+                if (subOp.getValue().getOperatorTag() == LogicalOperatorTag.DELEGATE_OPERATOR) {
+                    DelegateOperator dOp = (DelegateOperator) subOp.getValue();
+                    if (dOp.getDelegate() instanceof CommitOperator) {
+                        return (AbstractLogicalOperator) subOp.getValue();
+                    }
+                } else {
+                    AbstractLogicalOperator nestedOp =
+                            findOp((AbstractLogicalOperator) subOp.getValue(), searchId, param1, param2);
+                    if (nestedOp != null) {
+                        return nestedOp;
+                    }
+                }
             }
         }
         return null;
@@ -371,12 +499,12 @@
         return false;
     }
 
-    private boolean isSubscriptionsScan(AbstractLogicalOperator op, String subscriptionsName) {
+    private boolean isSubscriptionsScan(AbstractLogicalOperator op, String subscriptionsName, String subscriptionType) {
         if (op instanceof DataSourceScanOperator) {
             if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) {
                 DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
                 if (dds.getDataset().getItemTypeDataverseName().equals("Metadata")
-                        && dds.getDataset().getItemTypeName().equals("ChannelSubscriptionsType")) {
+                        && dds.getDataset().getItemTypeName().equals(subscriptionType)) {
                     if (subscriptionsName.equals("") || dds.getDataset().getDatasetName().equals(subscriptionsName)) {
                         return true;
                     }