Got pull channel plan fully working
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 4ec6d2f..1ef678d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -191,7 +191,7 @@
DelegateOperator dOp = push
? createNotifyBrokerPushPlan(brokerEndpointVar, badProject.getVariables().get(0), channelExecutionVar,
context, assign, (DistributeResultOperator) op1, channelDataverse, channelName)
- : createNotifyBrokerPullPlan(brokerEndpointVar, channelSubscriptionIdVar, channelExecutionVar, context,
+ : createNotifyBrokerPullPlan(brokerEndpointVar, brokerSubscriptionIdVar, channelExecutionVar, context,
assign,
(DistributeResultOperator) op1, channelDataverse, channelName);
@@ -235,22 +235,30 @@
new MutableObject<>(brokerSubscriptionChannelIdVarReference),
new MutableObject<>(new VariableReferenceExpression(channelSubscriptionIdVar)));
- ScalarFunctionCallExpression andExpression =
- new ScalarFunctionCallExpression(finfoGetAnd, new MutableObject<>(channelSubCheck),
+ ScalarFunctionCallExpression brokerAndExpression = new ScalarFunctionCallExpression(finfoGetAnd,
new MutableObject<>(brokerDataverseCheck), new MutableObject<>(brokerNameCheck));
- SelectOperator select = new SelectOperator(new MutableObject<>(andExpression), false, null);
- select.getInputs().addAll(op1.getInputs());
+ DataSourceScanOperator brokerScan = (DataSourceScanOperator) op1.getInputs().get(0).getValue();
+
+ SelectOperator selectSubscriptions = new SelectOperator(new MutableObject<>(channelSubCheck), false, null);
+ selectSubscriptions.getInputs().add(brokerScan.getInputs().get(0));
+
+ SelectOperator selectBrokers = new SelectOperator(new MutableObject<>(brokerAndExpression), false, null);
+ selectBrokers.getInputs().add(new MutableObject<>(brokerScan));
+
+ brokerScan.getInputs().set(0, new MutableObject<>(selectSubscriptions));
//Create Assign Operator
ScalarFunctionCallExpression getEndPoint = new ScalarFunctionCallExpression(finfoGetField,
new MutableObject<>(new VariableReferenceExpression(brokerVar)),
new MutableObject<>(new ConstantExpression(new AsterixConstantValue(new AInt32(2)))));
AssignOperator assign = new AssignOperator(brokerEndpointVar, new MutableObject<>(getEndPoint));
- assign.getInputs().add(new MutableObject<>(select));
+ assign.getInputs().add(new MutableObject<>(selectBrokers));
op1.getInputs().set(0, new MutableObject<>(assign));
- context.computeAndSetTypeEnvironmentForOperator(select);
+ context.computeAndSetTypeEnvironmentForOperator(selectSubscriptions);
+ context.computeAndSetTypeEnvironmentForOperator(brokerScan);
+ context.computeAndSetTypeEnvironmentForOperator(selectBrokers);
context.computeAndSetTypeEnvironmentForOperator(assign);
context.computeAndSetTypeEnvironmentForOperator(op1);