Progress?
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 5e4222e..1551a7c 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -252,8 +252,8 @@
         builder.append(BADConstants.ChannelExecutionTime + ", ");
         builder.append("sub." + BADConstants.ChannelSubscriptionId + " as " + BADConstants.ChannelSubscriptionId + ",");
         builder.append("current_datetime() as " + BADConstants.DeliveryTime + "\n");
-        builder.append("from " + dataverse + "." + brokerSubscriptionsTableName + " bs,\n");
-        builder.append(dataverse + "." + channelSubscriptionsTableName + " sub,\n");
+        builder.append("from " + dataverse + "." + channelSubscriptionsTableName + " sub,\n");
+        builder.append(dataverse + "." + brokerSubscriptionsTableName + " bs,\n");
         builder.append(BADConstants.BAD_DATAVERSE_NAME + "." + BADConstants.BROKER_KEYWORD + " b, \n");
         builder.append(function.getNamespace() + "." + function.getName() + "(");
         int i = 0;
@@ -261,10 +261,6 @@
             builder.append("sub.param" + i + ",");
         }
         builder.append("sub.param" + i + ") result \n");
-        builder.append("where b." + BADConstants.BrokerName + " = bs." + BADConstants.BrokerName + "\n");
-        builder.append("and b." + BADConstants.DataverseName + " = bs." + BADConstants.DataverseName + "\n");
-        builder.append(
-                "and bs." + BADConstants.ChannelSubscriptionId + " = sub." + BADConstants.ChannelSubscriptionId + "\n");
         if (!push) {
             builder.append(")");
             builder.append(" returning a");
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 82ca1f1..4ec6d2f 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -28,9 +28,11 @@
 import org.apache.asterix.bad.runtime.NotifyBrokerPOperator;
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.declared.DatasetDataSource;
+import org.apache.asterix.om.base.AInt32;
 import org.apache.asterix.om.base.AString;
 import org.apache.asterix.om.constants.AsterixConstantValue;
 import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.FunctionInfo;
 import org.apache.asterix.om.types.IAType;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -47,6 +49,7 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
@@ -58,6 +61,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
@@ -78,7 +82,7 @@
         }
 
         boolean push = false;
-        AbstractLogicalOperator op = findOp(op1, 4, "");
+        AbstractLogicalOperator op = findOp(op1, 4, "", "");
         if (op == null) {
             push = true;
         }
@@ -107,8 +111,9 @@
             //Now we know that we are inserting into results
 
             channelName = datasetName.substring(0, datasetName.length() - 7);
-            String subscriptionsName = channelName + "BrokerSubscriptions";
-            subscriptionsScan = (DataSourceScanOperator) findOp(op, 3, subscriptionsName);
+            String subscriptionsName = channelName + "ChannelSubscriptions";
+            subscriptionsScan =
+                    (DataSourceScanOperator) findOp(op, 3, subscriptionsName, BADConstants.ChannelSubscriptionsType);
             if (subscriptionsScan == null) {
                 return false;
             }
@@ -119,7 +124,7 @@
                 return false;
             }
             //if push, get the channel name here instead
-            subscriptionsScan = (DataSourceScanOperator) findOp(op, 3, "");
+            subscriptionsScan = (DataSourceScanOperator) findOp(op, 3, "", BADConstants.ChannelSubscriptionsType);
             if (subscriptionsScan == null) {
                 return false;
             }
@@ -129,15 +134,8 @@
             channelName = datasetName.substring(0, datasetName.length() - 13);
         }
 
-        //Now we need to get the broker EndPoint
-        LogicalVariable brokerEndpointVar = context.newVar();
-        AbstractLogicalOperator opAboveBrokersScan = findOp(op, 1, "");
-        if (opAboveBrokersScan == null) {
-            return false;
-        }
-
-        //get subscriptionIdVar
-        LogicalVariable subscriptionIdVar = subscriptionsScan.getVariables().get(1);
+        //get channelSubscriptionIdVar
+        LogicalVariable channelSubscriptionIdVar = subscriptionsScan.getVariables().get(0);
 
         //The channelExecutionTime is created just before the scan
         ILogicalOperator channelExecutionAssign = subscriptionsScan.getInputs().get(0).getValue();
@@ -153,25 +151,48 @@
             ((CommitOperator) ((DelegateOperator) op).getDelegate()).setSink(false);
         }
 
-        AssignOperator assignOp = createbrokerEndPointAssignOperator(brokerEndpointVar, opAboveBrokersScan);
-        //now brokerNameVar holds the brokerName for use farther up in the plan
+        //move broker scan
+        AbstractLogicalOperator opAboveBrokersScan = findOp(op, 1, "", "");
+        if (opAboveBrokersScan == null) {
+            return false;
+        }
+        DataSourceScanOperator brokerScan =
+                moveScans(opAboveBrokersScan, op1, context, channelName + "BrokerSubscriptions");
 
-        context.computeAndSetTypeEnvironmentForOperator(assignOp);
-        context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan);
-        context.computeAndSetTypeEnvironmentForOperator(op);
+        if (brokerScan == null) {
+            return false;
+        }
+        DataSourceScanOperator brokerSubscriptionScan =
+                (DataSourceScanOperator) brokerScan.getInputs().get(0).getValue();
 
-        ProjectOperator badProject = (ProjectOperator) findOp(op1, 2, "");
-        badProject.getVariables().add(subscriptionIdVar);
-        badProject.getVariables().add(brokerEndpointVar);
+        //Add select to join subscriptions and broker and assign to get endpoint
+        LogicalVariable brokerDataverseVar = brokerScan.getVariables().get(0);
+        LogicalVariable brokerNameVar = brokerScan.getVariables().get(1);
+        LogicalVariable brokerVar = brokerScan.getVariables().get(2);
+        LogicalVariable brokerSubscriptionVar = brokerSubscriptionScan.getVariables().get(2);
+        LogicalVariable brokerSubscriptionIdVar = brokerSubscriptionScan.getVariables().get(1);
+        LogicalVariable brokerSubscriptionChannelIdVar = brokerSubscriptionScan.getVariables().get(0);
+
+        LogicalVariable brokerEndpointVar = context.newVar();
+        AssignOperator assign = createAssignAndSelect(brokerDataverseVar, brokerNameVar, brokerVar,
+                brokerSubscriptionVar, brokerSubscriptionIdVar, brokerEndpointVar, channelSubscriptionIdVar,
+                brokerSubscriptionChannelIdVar, context, op1);
+
+        context.computeAndSetTypeEnvironmentForOperator(op1);
+
+        //Maybe we need to add a project???
+        ProjectOperator badProject = (ProjectOperator) findOp(op1, 2, "", "");
         badProject.getVariables().add(channelExecutionVar);
+        badProject.getVariables().add(channelSubscriptionIdVar);
         context.computeAndSetTypeEnvironmentForOperator(badProject);
 
 
         //Create my brokerNotify plan above the extension Operator
         DelegateOperator dOp = push
                 ? createNotifyBrokerPushPlan(brokerEndpointVar, badProject.getVariables().get(0), channelExecutionVar,
-                        context, op, (DistributeResultOperator) op1, channelDataverse, channelName)
-                : createNotifyBrokerPullPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar, context, op,
+                        context, assign, (DistributeResultOperator) op1, channelDataverse, channelName)
+                : createNotifyBrokerPullPlan(brokerEndpointVar, channelSubscriptionIdVar, channelExecutionVar, context,
+                        assign,
                         (DistributeResultOperator) op1, channelDataverse, channelName);
 
         opRef.setValue(dOp);
@@ -179,6 +200,98 @@
         return true;
     }
 
+    private AssignOperator createAssignAndSelect(LogicalVariable brokerDataverseVar, LogicalVariable brokerNameVar,
+            LogicalVariable brokerVar, LogicalVariable brokerSubscriptionVar, LogicalVariable brokerSubscriptionIdVar,
+            LogicalVariable brokerEndpointVar, LogicalVariable channelSubscriptionIdVar,
+            LogicalVariable brokerSubscriptionChannelIdVar, IOptimizationContext context, AbstractLogicalOperator op1)
+            throws AlgebricksException {
+
+        FunctionInfo finfoGetField =
+                (FunctionInfo) FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_INDEX);
+        FunctionInfo finfoGetEquality = (FunctionInfo) FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.EQ);
+        FunctionInfo finfoGetAnd = (FunctionInfo) FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.AND);
+
+        //Create Select Operator
+        //The operator matches (A) the broker dataverse and name between broker subscriptions and brokers
+        // and (B) the channel subscription Id between broker subscriptions and channel subscriptions
+        ScalarFunctionCallExpression getBrokerName = new ScalarFunctionCallExpression(finfoGetField,
+                new MutableObject<>(new VariableReferenceExpression(brokerSubscriptionVar)),
+                new MutableObject<>(new ConstantExpression(new AsterixConstantValue(new AInt32(3)))));
+        ScalarFunctionCallExpression getBrokerDataverse = new ScalarFunctionCallExpression(finfoGetField,
+                new MutableObject<>(new VariableReferenceExpression(brokerSubscriptionVar)),
+                new MutableObject<>(new ConstantExpression(new AsterixConstantValue(new AInt32(2)))));
+
+        VariableReferenceExpression BrokerBrokerNameReference = new VariableReferenceExpression(brokerNameVar);
+        VariableReferenceExpression BrokerBrokerDataverseReference =
+                new VariableReferenceExpression(brokerDataverseVar);
+        VariableReferenceExpression brokerSubscriptionChannelIdVarReference =
+                new VariableReferenceExpression(brokerSubscriptionChannelIdVar);
+
+        ScalarFunctionCallExpression brokerNameCheck = new ScalarFunctionCallExpression(finfoGetEquality,
+                new MutableObject<>(getBrokerName), new MutableObject<>(BrokerBrokerNameReference));
+        ScalarFunctionCallExpression brokerDataverseCheck = new ScalarFunctionCallExpression(finfoGetEquality,
+                new MutableObject<>(getBrokerDataverse), new MutableObject<>(BrokerBrokerDataverseReference));
+        ScalarFunctionCallExpression channelSubCheck = new ScalarFunctionCallExpression(finfoGetEquality,
+                new MutableObject<>(brokerSubscriptionChannelIdVarReference),
+                new MutableObject<>(new VariableReferenceExpression(channelSubscriptionIdVar)));
+
+        ScalarFunctionCallExpression andExpression =
+                new ScalarFunctionCallExpression(finfoGetAnd, new MutableObject<>(channelSubCheck),
+                        new MutableObject<>(brokerDataverseCheck), new MutableObject<>(brokerNameCheck));
+
+        SelectOperator select = new SelectOperator(new MutableObject<>(andExpression), false, null);
+        select.getInputs().addAll(op1.getInputs());
+
+        //Create Assign Operator
+        ScalarFunctionCallExpression getEndPoint = new ScalarFunctionCallExpression(finfoGetField,
+                new MutableObject<>(new VariableReferenceExpression(brokerVar)),
+                new MutableObject<>(new ConstantExpression(new AsterixConstantValue(new AInt32(2)))));
+        AssignOperator assign = new AssignOperator(brokerEndpointVar, new MutableObject<>(getEndPoint));
+        assign.getInputs().add(new MutableObject<>(select));
+
+        op1.getInputs().set(0, new MutableObject<>(assign));
+        context.computeAndSetTypeEnvironmentForOperator(select);
+        context.computeAndSetTypeEnvironmentForOperator(assign);
+        context.computeAndSetTypeEnvironmentForOperator(op1);
+
+        return assign;
+
+    }
+
+    private DataSourceScanOperator moveScans(AbstractLogicalOperator opAboveScan, AbstractLogicalOperator op1,
+            IOptimizationContext context, String subscriptionsName) throws AlgebricksException {
+
+        DataSourceScanOperator brokerScan = null;
+        int i = 0;
+        for (Mutable<ILogicalOperator> subOp : opAboveScan.getInputs()) {
+            if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
+                brokerScan = (DataSourceScanOperator) subOp.getValue();
+                break;
+            }
+            i++;
+        }
+        if (brokerScan == null) {
+            return null;
+        }
+
+        AbstractLogicalOperator brokerSubcriptionsScan =
+                (AbstractLogicalOperator) brokerScan.getInputs().get(0).getValue();
+
+        if (!isSubscriptionsScan(brokerSubcriptionsScan, subscriptionsName, BADConstants.BrokerSubscriptionsType)) {
+            return null;
+        }
+        opAboveScan.getInputs().set(i, brokerSubcriptionsScan.getInputs().get(0));
+        context.computeAndSetTypeEnvironmentForOperator(opAboveScan);
+
+        brokerSubcriptionsScan.getInputs().set(0, op1.getInputs().get(0));
+        op1.getInputs().set(0, new MutableObject<>(brokerScan));
+        context.computeAndSetTypeEnvironmentForOperator(brokerSubcriptionsScan);
+        context.computeAndSetTypeEnvironmentForOperator(brokerScan);
+        context.computeAndSetTypeEnvironmentForOperator(op1);
+        return brokerScan;
+
+    }
+
     private DelegateOperator createBrokerOp(LogicalVariable brokerEndpointVar, LogicalVariable sendVar,
             LogicalVariable channelExecutionVar, String channelDataverse, String channelName, boolean push,
             IAType resultType) {
@@ -313,9 +426,10 @@
      * 3. The subscriptions scan
      * 4. Commit operator
      *
-     * param is the name of the channel when searching for a subscriptions scan
+     * param1 is the name of the expected subscriptions dataset when searching for a subscriptions scan
+     * and param2 is the type of the subscriptions dataset (channel or broker)
      */
-    private AbstractLogicalOperator findOp(AbstractLogicalOperator op, int searchId, String param) {
+    private AbstractLogicalOperator findOp(AbstractLogicalOperator op, int searchId, String param1, String param2) {
         if (!op.hasInputs()) {
             return null;
         }
@@ -325,7 +439,7 @@
                     return op;
                 } else {
                     AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
-                            searchId, param);
+                            searchId, param1, param2);
                     if (nestedOp != null) {
                         return nestedOp;
                     }
@@ -336,7 +450,7 @@
                     return (AbstractLogicalOperator) subOp.getValue();
                 } else {
                     AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
-                            searchId, param);
+                            searchId, param1, param2);
                     if (nestedOp != null) {
                         return nestedOp;
                     }
@@ -344,11 +458,11 @@
             }
 
             else if (searchId == 3) {
-                if (isSubscriptionsScan((AbstractLogicalOperator) subOp.getValue(), param)) {
+                if (isSubscriptionsScan((AbstractLogicalOperator) subOp.getValue(), param1, param2)) {
                     return (AbstractLogicalOperator) subOp.getValue();
                 } else {
                     AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
-                            searchId, param);
+                            searchId, param1, param2);
                     if (nestedOp != null) {
                         return nestedOp;
                     }
@@ -362,7 +476,7 @@
                     }
                 } else {
                     AbstractLogicalOperator nestedOp =
-                            findOp((AbstractLogicalOperator) subOp.getValue(), searchId, param);
+                            findOp((AbstractLogicalOperator) subOp.getValue(), searchId, param1, param2);
                     if (nestedOp != null) {
                         return nestedOp;
                     }
@@ -385,12 +499,12 @@
         return false;
     }
 
-    private boolean isSubscriptionsScan(AbstractLogicalOperator op, String subscriptionsName) {
+    private boolean isSubscriptionsScan(AbstractLogicalOperator op, String subscriptionsName, String subscriptionType) {
         if (op instanceof DataSourceScanOperator) {
             if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) {
                 DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
                 if (dds.getDataset().getItemTypeDataverseName().equals("Metadata")
-                        && dds.getDataset().getItemTypeName().equals(BADConstants.BrokerSubscriptionsType)) {
+                        && dds.getDataset().getItemTypeName().equals(subscriptionType)) {
                     if (subscriptionsName.equals("") || dds.getDataset().getDatasetName().equals(subscriptionsName)) {
                         return true;
                     }