Progress?
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 5e4222e..1551a7c 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -252,8 +252,8 @@
builder.append(BADConstants.ChannelExecutionTime + ", ");
builder.append("sub." + BADConstants.ChannelSubscriptionId + " as " + BADConstants.ChannelSubscriptionId + ",");
builder.append("current_datetime() as " + BADConstants.DeliveryTime + "\n");
- builder.append("from " + dataverse + "." + brokerSubscriptionsTableName + " bs,\n");
- builder.append(dataverse + "." + channelSubscriptionsTableName + " sub,\n");
+ builder.append("from " + dataverse + "." + channelSubscriptionsTableName + " sub,\n");
+ builder.append(dataverse + "." + brokerSubscriptionsTableName + " bs,\n");
builder.append(BADConstants.BAD_DATAVERSE_NAME + "." + BADConstants.BROKER_KEYWORD + " b, \n");
builder.append(function.getNamespace() + "." + function.getName() + "(");
int i = 0;
@@ -261,10 +261,6 @@
builder.append("sub.param" + i + ",");
}
builder.append("sub.param" + i + ") result \n");
- builder.append("where b." + BADConstants.BrokerName + " = bs." + BADConstants.BrokerName + "\n");
- builder.append("and b." + BADConstants.DataverseName + " = bs." + BADConstants.DataverseName + "\n");
- builder.append(
- "and bs." + BADConstants.ChannelSubscriptionId + " = sub." + BADConstants.ChannelSubscriptionId + "\n");
if (!push) {
builder.append(")");
builder.append(" returning a");
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 82ca1f1..4ec6d2f 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -28,9 +28,11 @@
import org.apache.asterix.bad.runtime.NotifyBrokerPOperator;
import org.apache.asterix.lang.common.util.FunctionUtil;
import org.apache.asterix.metadata.declared.DatasetDataSource;
+import org.apache.asterix.om.base.AInt32;
import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.constants.AsterixConstantValue;
import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.FunctionInfo;
import org.apache.asterix.om.types.IAType;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -47,6 +49,7 @@
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
@@ -58,6 +61,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -78,7 +82,7 @@
}
boolean push = false;
- AbstractLogicalOperator op = findOp(op1, 4, "");
+ AbstractLogicalOperator op = findOp(op1, 4, "", "");
if (op == null) {
push = true;
}
@@ -107,8 +111,9 @@
//Now we know that we are inserting into results
channelName = datasetName.substring(0, datasetName.length() - 7);
- String subscriptionsName = channelName + "BrokerSubscriptions";
- subscriptionsScan = (DataSourceScanOperator) findOp(op, 3, subscriptionsName);
+ String subscriptionsName = channelName + "ChannelSubscriptions";
+ subscriptionsScan =
+ (DataSourceScanOperator) findOp(op, 3, subscriptionsName, BADConstants.ChannelSubscriptionsType);
if (subscriptionsScan == null) {
return false;
}
@@ -119,7 +124,7 @@
return false;
}
//if push, get the channel name here instead
- subscriptionsScan = (DataSourceScanOperator) findOp(op, 3, "");
+ subscriptionsScan = (DataSourceScanOperator) findOp(op, 3, "", BADConstants.ChannelSubscriptionsType);
if (subscriptionsScan == null) {
return false;
}
@@ -129,15 +134,8 @@
channelName = datasetName.substring(0, datasetName.length() - 13);
}
- //Now we need to get the broker EndPoint
- LogicalVariable brokerEndpointVar = context.newVar();
- AbstractLogicalOperator opAboveBrokersScan = findOp(op, 1, "");
- if (opAboveBrokersScan == null) {
- return false;
- }
-
- //get subscriptionIdVar
- LogicalVariable subscriptionIdVar = subscriptionsScan.getVariables().get(1);
+ //get channelSubscriptionIdVar
+ LogicalVariable channelSubscriptionIdVar = subscriptionsScan.getVariables().get(0);
//The channelExecutionTime is created just before the scan
ILogicalOperator channelExecutionAssign = subscriptionsScan.getInputs().get(0).getValue();
@@ -153,25 +151,48 @@
((CommitOperator) ((DelegateOperator) op).getDelegate()).setSink(false);
}
- AssignOperator assignOp = createbrokerEndPointAssignOperator(brokerEndpointVar, opAboveBrokersScan);
- //now brokerNameVar holds the brokerName for use farther up in the plan
+ //move broker scan
+ AbstractLogicalOperator opAboveBrokersScan = findOp(op, 1, "", "");
+ if (opAboveBrokersScan == null) {
+ return false;
+ }
+ DataSourceScanOperator brokerScan =
+ moveScans(opAboveBrokersScan, op1, context, channelName + "BrokerSubscriptions");
- context.computeAndSetTypeEnvironmentForOperator(assignOp);
- context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan);
- context.computeAndSetTypeEnvironmentForOperator(op);
+ if (brokerScan == null) {
+ return false;
+ }
+ DataSourceScanOperator brokerSubscriptionScan =
+ (DataSourceScanOperator) brokerScan.getInputs().get(0).getValue();
- ProjectOperator badProject = (ProjectOperator) findOp(op1, 2, "");
- badProject.getVariables().add(subscriptionIdVar);
- badProject.getVariables().add(brokerEndpointVar);
+ //Add select to join subscriptions and broker and assign to get endpoint
+ LogicalVariable brokerDataverseVar = brokerScan.getVariables().get(0);
+ LogicalVariable brokerNameVar = brokerScan.getVariables().get(1);
+ LogicalVariable brokerVar = brokerScan.getVariables().get(2);
+ LogicalVariable brokerSubscriptionVar = brokerSubscriptionScan.getVariables().get(2);
+ LogicalVariable brokerSubscriptionIdVar = brokerSubscriptionScan.getVariables().get(1);
+ LogicalVariable brokerSubscriptionChannelIdVar = brokerSubscriptionScan.getVariables().get(0);
+
+ LogicalVariable brokerEndpointVar = context.newVar();
+ AssignOperator assign = createAssignAndSelect(brokerDataverseVar, brokerNameVar, brokerVar,
+ brokerSubscriptionVar, brokerSubscriptionIdVar, brokerEndpointVar, channelSubscriptionIdVar,
+ brokerSubscriptionChannelIdVar, context, op1);
+
+ context.computeAndSetTypeEnvironmentForOperator(op1);
+
+ //Maybe we need to add a project???
+ ProjectOperator badProject = (ProjectOperator) findOp(op1, 2, "", "");
badProject.getVariables().add(channelExecutionVar);
+ badProject.getVariables().add(channelSubscriptionIdVar);
context.computeAndSetTypeEnvironmentForOperator(badProject);
//Create my brokerNotify plan above the extension Operator
DelegateOperator dOp = push
? createNotifyBrokerPushPlan(brokerEndpointVar, badProject.getVariables().get(0), channelExecutionVar,
- context, op, (DistributeResultOperator) op1, channelDataverse, channelName)
- : createNotifyBrokerPullPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar, context, op,
+ context, assign, (DistributeResultOperator) op1, channelDataverse, channelName)
+ : createNotifyBrokerPullPlan(brokerEndpointVar, channelSubscriptionIdVar, channelExecutionVar, context,
+ assign,
(DistributeResultOperator) op1, channelDataverse, channelName);
opRef.setValue(dOp);
@@ -179,6 +200,98 @@
return true;
}
+ private AssignOperator createAssignAndSelect(LogicalVariable brokerDataverseVar, LogicalVariable brokerNameVar,
+ LogicalVariable brokerVar, LogicalVariable brokerSubscriptionVar, LogicalVariable brokerSubscriptionIdVar,
+ LogicalVariable brokerEndpointVar, LogicalVariable channelSubscriptionIdVar,
+ LogicalVariable brokerSubscriptionChannelIdVar, IOptimizationContext context, AbstractLogicalOperator op1)
+ throws AlgebricksException {
+
+ FunctionInfo finfoGetField =
+ (FunctionInfo) FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_INDEX);
+ FunctionInfo finfoGetEquality = (FunctionInfo) FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.EQ);
+ FunctionInfo finfoGetAnd = (FunctionInfo) FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.AND);
+
+ //Create Select Operator
+ //The operator matches (A) the broker dataverse and name between broker subscriptions and brokers
+ // and (B) the channel subscription Id between broker subscriptions and channel subscriptions
+ ScalarFunctionCallExpression getBrokerName = new ScalarFunctionCallExpression(finfoGetField,
+ new MutableObject<>(new VariableReferenceExpression(brokerSubscriptionVar)),
+ new MutableObject<>(new ConstantExpression(new AsterixConstantValue(new AInt32(3)))));
+ ScalarFunctionCallExpression getBrokerDataverse = new ScalarFunctionCallExpression(finfoGetField,
+ new MutableObject<>(new VariableReferenceExpression(brokerSubscriptionVar)),
+ new MutableObject<>(new ConstantExpression(new AsterixConstantValue(new AInt32(2)))));
+
+ VariableReferenceExpression BrokerBrokerNameReference = new VariableReferenceExpression(brokerNameVar);
+ VariableReferenceExpression BrokerBrokerDataverseReference =
+ new VariableReferenceExpression(brokerDataverseVar);
+ VariableReferenceExpression brokerSubscriptionChannelIdVarReference =
+ new VariableReferenceExpression(brokerSubscriptionChannelIdVar);
+
+ ScalarFunctionCallExpression brokerNameCheck = new ScalarFunctionCallExpression(finfoGetEquality,
+ new MutableObject<>(getBrokerName), new MutableObject<>(BrokerBrokerNameReference));
+ ScalarFunctionCallExpression brokerDataverseCheck = new ScalarFunctionCallExpression(finfoGetEquality,
+ new MutableObject<>(getBrokerDataverse), new MutableObject<>(BrokerBrokerDataverseReference));
+ ScalarFunctionCallExpression channelSubCheck = new ScalarFunctionCallExpression(finfoGetEquality,
+ new MutableObject<>(brokerSubscriptionChannelIdVarReference),
+ new MutableObject<>(new VariableReferenceExpression(channelSubscriptionIdVar)));
+
+ ScalarFunctionCallExpression andExpression =
+ new ScalarFunctionCallExpression(finfoGetAnd, new MutableObject<>(channelSubCheck),
+ new MutableObject<>(brokerDataverseCheck), new MutableObject<>(brokerNameCheck));
+
+ SelectOperator select = new SelectOperator(new MutableObject<>(andExpression), false, null);
+ select.getInputs().addAll(op1.getInputs());
+
+ //Create Assign Operator
+ ScalarFunctionCallExpression getEndPoint = new ScalarFunctionCallExpression(finfoGetField,
+ new MutableObject<>(new VariableReferenceExpression(brokerVar)),
+ new MutableObject<>(new ConstantExpression(new AsterixConstantValue(new AInt32(2)))));
+ AssignOperator assign = new AssignOperator(brokerEndpointVar, new MutableObject<>(getEndPoint));
+ assign.getInputs().add(new MutableObject<>(select));
+
+ op1.getInputs().set(0, new MutableObject<>(assign));
+ context.computeAndSetTypeEnvironmentForOperator(select);
+ context.computeAndSetTypeEnvironmentForOperator(assign);
+ context.computeAndSetTypeEnvironmentForOperator(op1);
+
+ return assign;
+
+ }
+
+ private DataSourceScanOperator moveScans(AbstractLogicalOperator opAboveScan, AbstractLogicalOperator op1,
+ IOptimizationContext context, String subscriptionsName) throws AlgebricksException {
+
+ DataSourceScanOperator brokerScan = null;
+ int i = 0;
+ for (Mutable<ILogicalOperator> subOp : opAboveScan.getInputs()) {
+ if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
+ brokerScan = (DataSourceScanOperator) subOp.getValue();
+ break;
+ }
+ i++;
+ }
+ if (brokerScan == null) {
+ return null;
+ }
+
+ AbstractLogicalOperator brokerSubcriptionsScan =
+ (AbstractLogicalOperator) brokerScan.getInputs().get(0).getValue();
+
+ if (!isSubscriptionsScan(brokerSubcriptionsScan, subscriptionsName, BADConstants.BrokerSubscriptionsType)) {
+ return null;
+ }
+ opAboveScan.getInputs().set(i, brokerSubcriptionsScan.getInputs().get(0));
+ context.computeAndSetTypeEnvironmentForOperator(opAboveScan);
+
+ brokerSubcriptionsScan.getInputs().set(0, op1.getInputs().get(0));
+ op1.getInputs().set(0, new MutableObject<>(brokerScan));
+ context.computeAndSetTypeEnvironmentForOperator(brokerSubcriptionsScan);
+ context.computeAndSetTypeEnvironmentForOperator(brokerScan);
+ context.computeAndSetTypeEnvironmentForOperator(op1);
+ return brokerScan;
+
+ }
+
private DelegateOperator createBrokerOp(LogicalVariable brokerEndpointVar, LogicalVariable sendVar,
LogicalVariable channelExecutionVar, String channelDataverse, String channelName, boolean push,
IAType resultType) {
@@ -313,9 +426,10 @@
* 3. The subscriptions scan
* 4. Commit operator
*
- * param is the name of the channel when searching for a subscriptions scan
+ * param1 is the name of the expected subscriptions dataset when searching for a subscriptions scan
+ * and param2 is the type of the subscriptions dataset (channel or broker)
*/
- private AbstractLogicalOperator findOp(AbstractLogicalOperator op, int searchId, String param) {
+ private AbstractLogicalOperator findOp(AbstractLogicalOperator op, int searchId, String param1, String param2) {
if (!op.hasInputs()) {
return null;
}
@@ -325,7 +439,7 @@
return op;
} else {
AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
- searchId, param);
+ searchId, param1, param2);
if (nestedOp != null) {
return nestedOp;
}
@@ -336,7 +450,7 @@
return (AbstractLogicalOperator) subOp.getValue();
} else {
AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
- searchId, param);
+ searchId, param1, param2);
if (nestedOp != null) {
return nestedOp;
}
@@ -344,11 +458,11 @@
}
else if (searchId == 3) {
- if (isSubscriptionsScan((AbstractLogicalOperator) subOp.getValue(), param)) {
+ if (isSubscriptionsScan((AbstractLogicalOperator) subOp.getValue(), param1, param2)) {
return (AbstractLogicalOperator) subOp.getValue();
} else {
AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
- searchId, param);
+ searchId, param1, param2);
if (nestedOp != null) {
return nestedOp;
}
@@ -362,7 +476,7 @@
}
} else {
AbstractLogicalOperator nestedOp =
- findOp((AbstractLogicalOperator) subOp.getValue(), searchId, param);
+ findOp((AbstractLogicalOperator) subOp.getValue(), searchId, param1, param2);
if (nestedOp != null) {
return nestedOp;
}
@@ -385,12 +499,12 @@
return false;
}
- private boolean isSubscriptionsScan(AbstractLogicalOperator op, String subscriptionsName) {
+ private boolean isSubscriptionsScan(AbstractLogicalOperator op, String subscriptionsName, String subscriptionType) {
if (op instanceof DataSourceScanOperator) {
if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) {
DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
if (dds.getDataset().getItemTypeDataverseName().equals("Metadata")
- && dds.getDataset().getItemTypeName().equals(BADConstants.BrokerSubscriptionsType)) {
+ && dds.getDataset().getItemTypeName().equals(subscriptionType)) {
if (subscriptionsName.equals("") || dds.getDataset().getDatasetName().equals(subscriptionsName)) {
return true;
}