Better push version
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 76e3e82..9a2c6b1 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -257,9 +257,9 @@
*
* push version:
* SET inline_with "false";
- * select value {"payload": {"result":r, "subscriptionId": brokerSubId}} from (
+ * select value {"payload": {"result":result, "subscriptionId": brokerSubId}} from (
* with channelExecutionTime as current_datetime()
- * select b.BrokerEndPoint, result as r, bs.brokerSubId as brokerSubId, channelExecutionTime
+ * select b.BrokerEndPoint, result, bs.brokerSubId as brokerSubId, channelExecutionTime
* from steven.EmergencyChannelChannelSubscriptions sub,
* steven.RecentEmergenciesNearUser(sub.param0) result,
* steven.EmergencyChannelBrokerSubscriptions bs,
@@ -291,7 +291,7 @@
} else {
builder.append(
"select value {\"payload\": {\"" + FUNCTION_RESULT_VAR + "\":" + FUNCTION_RESULT_VAR
- + ", \"subscriptionIds\": brokerSubIds}} from (\n");
+ + ", \"subscriptionId\": brokerSubId}} from (\n");
builder.append("with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n");
builder.append(
"select " + BROKER_RECORD_VAR + "." + BADConstants.BrokerEndPoint + ", " + FUNCTION_RESULT_VAR
@@ -334,10 +334,7 @@
builder.append(" returning " + INSERTED_RECORD_VAR);
} else {
- builder.append(") results\n");
- builder.append("group by " + BADConstants.BrokerEndPoint + "," + FUNCTION_RESULT_VAR + ", "
- + BADConstants.ChannelExecutionTime);
- builder.append(" group as brokerSubIds (" + BADConstants.BrokerSubscriptionId + " as subscriptionId)");
+ builder.append(") results");
}
builder.append(";");
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 18cd41a..2596421 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -92,9 +92,9 @@
String channelName;
AssignOperator pushAssign = null;
AssignOperator newAssign = null;
- GroupByOperator pushGroupBy = null;
+ UnnestOperator pushUnnest = null;
LogicalVariable brokerSubsVar = null;
- LogicalVariable brokerEndpoint = null;
+ LogicalVariable brokerEndpoint = context.newVar();
LogicalVariable brokerSubId = null;
if (!push) {
@@ -138,12 +138,6 @@
}
pushAssign = (AssignOperator) op2;
- AbstractLogicalOperator op3 = (AbstractLogicalOperator) pushAssign.getInputs().get(0).getValue();
- if (op3.getOperatorTag() != LogicalOperatorTag.GROUP) {
- return false;
- }
- pushGroupBy = (GroupByOperator) op3;
-
//if push, get the channel name here instead
subscriptionsScan = (DataSourceScanOperator) findOp(op, LogicalOperatorTag.DATASOURCESCAN, "",
BADConstants.ChannelSubscriptionsType);
@@ -154,7 +148,13 @@
String datasetName = dds.getDataset().getDatasetName();
channelDataverse = dds.getDataset().getDataverseName();
channelName = datasetName.substring(0, datasetName.length() - 13);
- brokerEndpoint = pushGroupBy.getGroupByList().get(0).first;
+
+ AbstractLogicalOperator unnest = (AbstractLogicalOperator) pushAssign.getInputs().get(0).getValue();
+ if (unnest.getOperatorTag() != LogicalOperatorTag.UNNEST) {
+ return false;
+ }
+
+ pushUnnest = (UnnestOperator) unnest;
}
//The channelExecutionTime is created just before the scan
@@ -173,7 +173,6 @@
if (subplanOperator == null) {
return false;
}
- brokerEndpoint = context.newVar();
brokerSubId = context.newVar();
brokerSubsVar = ((AggregateOperator) subplanOperator.getNestedPlans().get(0).getRoots().get(0).getValue())
.getVariables().get(0);
@@ -181,19 +180,18 @@
newAssign = createAssignsAndUnnest(brokerSubsVar, brokerEndpoint, brokerSubId, op, context);
context.computeAndSetTypeEnvironmentForOperator(op1);
- } else {
- channelExecutionVar = pushGroupBy.getGroupByList().get(2).first;
+
+ //Maintain the variables through the existing project
+ ProjectOperator badProject = (ProjectOperator) findOp(op1, LogicalOperatorTag.PROJECT, "", "");
+ badProject.getVariables().add(channelExecutionVar);
+ badProject.getVariables().add(brokerSubsVar);
+ context.computeAndSetTypeEnvironmentForOperator(badProject);
}
- //Maintain the variables through the existing project
- ProjectOperator badProject = (ProjectOperator) findOp(op1, LogicalOperatorTag.PROJECT, "", "");
- badProject.getVariables().add(channelExecutionVar);
- badProject.getVariables().add(push ? brokerEndpoint : brokerSubsVar);
- context.computeAndSetTypeEnvironmentForOperator(badProject);
//Create my brokerNotify plan above the extension Operator
DelegateOperator dOp = push
? createNotifyBrokerPushPlan(brokerEndpoint, channelExecutionVar, context, pushAssign, channelDataverse,
- channelName)
+ channelName, pushUnnest)
: createNotifyBrokerPullPlan(brokerEndpoint, brokerSubId, channelExecutionVar, context, newAssign,
channelDataverse, channelName);
@@ -262,7 +260,7 @@
private DelegateOperator createNotifyBrokerPushPlan(LogicalVariable brokerEndpointVar,
LogicalVariable channelExecutionVar, IOptimizationContext context, AssignOperator payLoadAssign,
- String channelDataverse, String channelName)
+ String channelDataverse, String channelName, UnnestOperator pushUnnest)
throws AlgebricksException {
IVariableTypeEnvironment env = payLoadAssign.computeOutputTypeEnvironment(context);
IAType resultType = (IAType) env.getVarType(payLoadAssign.getVariables().get(0));
@@ -273,6 +271,21 @@
channelName, true, resultType);
extensionOp.getInputs().add(new MutableObject<>(payLoadAssign));
+
+ FunctionInfo finfoGetField = (FunctionInfo) FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME);
+ ScalarFunctionCallExpression getBrokerEndPoint = new ScalarFunctionCallExpression(finfoGetField,
+ new MutableObject<>(new VariableReferenceExpression(pushUnnest.getVariable())), new MutableObject<>(
+ new ConstantExpression(new AsterixConstantValue(new AString(BADConstants.BrokerEndPoint)))));
+
+ AssignOperator assignBrokerEndPoint =
+ new AssignOperator(brokerEndpointVar, new MutableObject<>(getBrokerEndPoint));
+
+ assignBrokerEndPoint.getInputs().addAll(payLoadAssign.getInputs());
+ payLoadAssign.getInputs().clear();
+ payLoadAssign.getInputs().add(new MutableObject<>(assignBrokerEndPoint));
+
+ context.computeAndSetTypeEnvironmentForOperator(assignBrokerEndPoint);
+ context.computeAndSetTypeEnvironmentForOperator(payLoadAssign);
context.computeAndSetTypeEnvironmentForOperator(extensionOp);
return extensionOp;
@@ -376,9 +389,17 @@
} else if (searchTag == LogicalOperatorTag.PROJECT) {
return (AbstractLogicalOperator) subOp.getValue();
- } else if (searchTag == LogicalOperatorTag.DATASOURCESCAN && isSubscriptionsScan(
+ } else if (searchTag == LogicalOperatorTag.DATASOURCESCAN) {
+ if (isSubscriptionsScan(
(AbstractLogicalOperator) subOp.getValue(), subscriptionsName, subscriptionType)) {
- return (AbstractLogicalOperator) subOp.getValue();
+ return (AbstractLogicalOperator) subOp.getValue();
+ } else {
+ AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(), searchTag,
+ subscriptionsName, subscriptionType);
+ if (nestedOp != null) {
+ return nestedOp;
+ }
+ }
} else if (searchTag == LogicalOperatorTag.DELEGATE_OPERATOR) {
DelegateOperator dOp = (DelegateOperator) subOp.getValue();
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
index 8e07af2..b604f75 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -190,8 +190,6 @@
}
if (push) {
- int pushOffset = inputArg1.getStartOffset();
- bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), pushOffset + 1);
if (!firstResult) {
sendStreams.get(endpoint).append(',');
}