Got pull channel plan fully working
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 4ec6d2f..1ef678d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -191,7 +191,7 @@
         DelegateOperator dOp = push
                 ? createNotifyBrokerPushPlan(brokerEndpointVar, badProject.getVariables().get(0), channelExecutionVar,
                         context, assign, (DistributeResultOperator) op1, channelDataverse, channelName)
-                : createNotifyBrokerPullPlan(brokerEndpointVar, channelSubscriptionIdVar, channelExecutionVar, context,
+                : createNotifyBrokerPullPlan(brokerEndpointVar, brokerSubscriptionIdVar, channelExecutionVar, context,
                         assign,
                         (DistributeResultOperator) op1, channelDataverse, channelName);
 
@@ -235,22 +235,30 @@
                 new MutableObject<>(brokerSubscriptionChannelIdVarReference),
                 new MutableObject<>(new VariableReferenceExpression(channelSubscriptionIdVar)));
 
-        ScalarFunctionCallExpression andExpression =
-                new ScalarFunctionCallExpression(finfoGetAnd, new MutableObject<>(channelSubCheck),
+        ScalarFunctionCallExpression brokerAndExpression = new ScalarFunctionCallExpression(finfoGetAnd,
                         new MutableObject<>(brokerDataverseCheck), new MutableObject<>(brokerNameCheck));
 
-        SelectOperator select = new SelectOperator(new MutableObject<>(andExpression), false, null);
-        select.getInputs().addAll(op1.getInputs());
+        DataSourceScanOperator brokerScan = (DataSourceScanOperator) op1.getInputs().get(0).getValue();
+
+        SelectOperator selectSubscriptions = new SelectOperator(new MutableObject<>(channelSubCheck), false, null);
+        selectSubscriptions.getInputs().add(brokerScan.getInputs().get(0));
+
+        SelectOperator selectBrokers = new SelectOperator(new MutableObject<>(brokerAndExpression), false, null);
+        selectBrokers.getInputs().add(new MutableObject<>(brokerScan));
+
+        brokerScan.getInputs().set(0, new MutableObject<>(selectSubscriptions));
 
         //Create Assign Operator
         ScalarFunctionCallExpression getEndPoint = new ScalarFunctionCallExpression(finfoGetField,
                 new MutableObject<>(new VariableReferenceExpression(brokerVar)),
                 new MutableObject<>(new ConstantExpression(new AsterixConstantValue(new AInt32(2)))));
         AssignOperator assign = new AssignOperator(brokerEndpointVar, new MutableObject<>(getEndPoint));
-        assign.getInputs().add(new MutableObject<>(select));
+        assign.getInputs().add(new MutableObject<>(selectBrokers));
 
         op1.getInputs().set(0, new MutableObject<>(assign));
-        context.computeAndSetTypeEnvironmentForOperator(select);
+        context.computeAndSetTypeEnvironmentForOperator(selectSubscriptions);
+        context.computeAndSetTypeEnvironmentForOperator(brokerScan);
+        context.computeAndSetTypeEnvironmentForOperator(selectBrokers);
         context.computeAndSetTypeEnvironmentForOperator(assign);
         context.computeAndSetTypeEnvironmentForOperator(op1);