blob: 2596421ea373329fd03c415fb8bc5850456d2d6c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.bad.rules;
import java.util.ArrayList;
import java.util.List;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.operators.CommitOperator;
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.bad.runtime.NotifyBrokerOperator;
import org.apache.asterix.bad.runtime.NotifyBrokerPOperator;
import org.apache.asterix.lang.common.util.FunctionUtil;
import org.apache.asterix.metadata.declared.DatasetDataSource;
import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.constants.AsterixConstantValue;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.functions.FunctionInfo;
import org.apache.asterix.om.types.IAType;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule {
@Override
public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
return false;
}
@Override
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
if (op1.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
return false;
}
boolean push = false;
AbstractLogicalOperator op = findOp(op1, LogicalOperatorTag.DELEGATE_OPERATOR, "", "");
if (op == null) {
push = true;
}
DataSourceScanOperator subscriptionsScan;
String channelDataverse;
String channelName;
AssignOperator pushAssign = null;
AssignOperator newAssign = null;
UnnestOperator pushUnnest = null;
LogicalVariable brokerSubsVar = null;
LogicalVariable brokerEndpoint = context.newVar();
LogicalVariable brokerSubId = null;
if (!push) {
AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
if (descendantOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
return false;
}
InsertDeleteUpsertOperator insertOp = (InsertDeleteUpsertOperator) descendantOp;
if (insertOp.getOperation() != InsertDeleteUpsertOperator.Kind.INSERT) {
return false;
}
DatasetDataSource dds = (DatasetDataSource) insertOp.getDataSource();
String datasetName = dds.getDataset().getDatasetName();
if (!dds.getDataset().getItemTypeDataverseName().equals("Metadata")
|| !dds.getDataset().getItemTypeName().equals("ChannelResultsType")
|| !datasetName.endsWith("Results")) {
return false;
}
channelDataverse = dds.getDataset().getDataverseName();
//Now we know that we are inserting into results
channelName = datasetName.substring(0, datasetName.length() - 7);
String subscriptionsName = channelName + "ChannelSubscriptions";
subscriptionsScan =
(DataSourceScanOperator) findOp(op, LogicalOperatorTag.DATASOURCESCAN, subscriptionsName,
BADConstants.ChannelSubscriptionsType);
if (subscriptionsScan == null) {
return false;
}
} else {
op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
if (op.getOperatorTag() != LogicalOperatorTag.PROJECT) {
return false;
}
ProjectOperator pushProject = (ProjectOperator) op;
AbstractLogicalOperator op2 = (AbstractLogicalOperator) pushProject.getInputs().get(0).getValue();
if (op2.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
return false;
}
pushAssign = (AssignOperator) op2;
//if push, get the channel name here instead
subscriptionsScan = (DataSourceScanOperator) findOp(op, LogicalOperatorTag.DATASOURCESCAN, "",
BADConstants.ChannelSubscriptionsType);
if (subscriptionsScan == null) {
return false;
}
DatasetDataSource dds = (DatasetDataSource) subscriptionsScan.getDataSource();
String datasetName = dds.getDataset().getDatasetName();
channelDataverse = dds.getDataset().getDataverseName();
channelName = datasetName.substring(0, datasetName.length() - 13);
AbstractLogicalOperator unnest = (AbstractLogicalOperator) pushAssign.getInputs().get(0).getValue();
if (unnest.getOperatorTag() != LogicalOperatorTag.UNNEST) {
return false;
}
pushUnnest = (UnnestOperator) unnest;
}
//The channelExecutionTime is created just before the scan
ILogicalOperator channelExecutionAssign = subscriptionsScan.getInputs().get(0).getValue();
if (channelExecutionAssign.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
return false;
}
LogicalVariable channelExecutionVar = ((AssignOperator) channelExecutionAssign).getVariables().get(0);
if (!channelExecutionVar.toString().equals("$$" + BADConstants.ChannelExecutionTime)) {
return false;
}
if (!push) {
//move broker scan
SubplanOperator subplanOperator = (SubplanOperator) findOp(op, LogicalOperatorTag.SUBPLAN, "", "");
if (subplanOperator == null) {
return false;
}
brokerSubId = context.newVar();
brokerSubsVar = ((AggregateOperator) subplanOperator.getNestedPlans().get(0).getRoots().get(0).getValue())
.getVariables().get(0);
newAssign = createAssignsAndUnnest(brokerSubsVar, brokerEndpoint, brokerSubId, op, context);
context.computeAndSetTypeEnvironmentForOperator(op1);
//Maintain the variables through the existing project
ProjectOperator badProject = (ProjectOperator) findOp(op1, LogicalOperatorTag.PROJECT, "", "");
badProject.getVariables().add(channelExecutionVar);
badProject.getVariables().add(brokerSubsVar);
context.computeAndSetTypeEnvironmentForOperator(badProject);
}
//Create my brokerNotify plan above the extension Operator
DelegateOperator dOp = push
? createNotifyBrokerPushPlan(brokerEndpoint, channelExecutionVar, context, pushAssign, channelDataverse,
channelName, pushUnnest)
: createNotifyBrokerPullPlan(brokerEndpoint, brokerSubId, channelExecutionVar, context, newAssign,
channelDataverse, channelName);
opRef.setValue(dOp);
return true;
}
private AssignOperator createAssignsAndUnnest(LogicalVariable brokerSubsVar, LogicalVariable brokerEndpoint,
LogicalVariable brokerSubId, AbstractLogicalOperator op, IOptimizationContext context)
throws AlgebricksException {
FunctionInfo finfoGetField =
(FunctionInfo) FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME);
FunctionInfo finfoScanCollection =
(FunctionInfo) FunctionUtil.getFunctionInfo(BuiltinFunctions.SCAN_COLLECTION);
UnnestingFunctionCallExpression scanBrokerSubsVar = new UnnestingFunctionCallExpression(finfoScanCollection,
new MutableObject<>(new VariableReferenceExpression(brokerSubsVar)));
LogicalVariable unnestedBrokerSubsVar = context.newVar();
UnnestOperator unnestBrokerSubsVar =
new UnnestOperator(unnestedBrokerSubsVar, new MutableObject<>(scanBrokerSubsVar));
unnestBrokerSubsVar.getInputs().add(new MutableObject<>(op));
ScalarFunctionCallExpression getBrokerEndPoint = new ScalarFunctionCallExpression(finfoGetField,
new MutableObject<>(new VariableReferenceExpression(unnestedBrokerSubsVar)), new MutableObject<>(
new ConstantExpression(new AsterixConstantValue(new AString(BADConstants.BrokerEndPoint)))));
AssignOperator assignBrokerEndPoint =
new AssignOperator(brokerEndpoint, new MutableObject<>(getBrokerEndPoint));
assignBrokerEndPoint.getInputs().add(new MutableObject<>(unnestBrokerSubsVar));
ScalarFunctionCallExpression getBrokerSubId = new ScalarFunctionCallExpression(finfoGetField,
new MutableObject<>(new VariableReferenceExpression(unnestedBrokerSubsVar)),
new MutableObject<>(new ConstantExpression(
new AsterixConstantValue(new AString(BADConstants.BrokerSubscriptionId)))));
AssignOperator assignBrokerSubId = new AssignOperator(brokerSubId, new MutableObject<>(getBrokerSubId));
assignBrokerSubId.getInputs().add(new MutableObject<>(assignBrokerEndPoint));
context.computeAndSetTypeEnvironmentForOperator(unnestBrokerSubsVar);
context.computeAndSetTypeEnvironmentForOperator(assignBrokerEndPoint);
context.computeAndSetTypeEnvironmentForOperator(assignBrokerSubId);
return assignBrokerSubId;
}
private DelegateOperator createBrokerOp(LogicalVariable brokerEndpointVar, LogicalVariable sendVar,
LogicalVariable channelExecutionVar, String channelDataverse, String channelName, boolean push,
IAType resultType) {
NotifyBrokerOperator notifyBrokerOp =
new NotifyBrokerOperator(brokerEndpointVar, sendVar, channelExecutionVar, push, resultType);
EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, channelDataverse, channelName);
NotifyBrokerPOperator notifyBrokerPOp = new NotifyBrokerPOperator(activeId);
notifyBrokerOp.setPhysicalOperator(notifyBrokerPOp);
DelegateOperator extensionOp = new DelegateOperator(notifyBrokerOp);
extensionOp.setPhysicalOperator(notifyBrokerPOp);
return extensionOp;
}
private DelegateOperator createNotifyBrokerPushPlan(LogicalVariable brokerEndpointVar,
LogicalVariable channelExecutionVar, IOptimizationContext context, AssignOperator payLoadAssign,
String channelDataverse, String channelName, UnnestOperator pushUnnest)
throws AlgebricksException {
IVariableTypeEnvironment env = payLoadAssign.computeOutputTypeEnvironment(context);
IAType resultType = (IAType) env.getVarType(payLoadAssign.getVariables().get(0));
//Create the NotifyBrokerOperator
DelegateOperator extensionOp = createBrokerOp(brokerEndpointVar, payLoadAssign.getVariables().get(0),
channelExecutionVar, channelDataverse,
channelName, true, resultType);
extensionOp.getInputs().add(new MutableObject<>(payLoadAssign));
FunctionInfo finfoGetField = (FunctionInfo) FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME);
ScalarFunctionCallExpression getBrokerEndPoint = new ScalarFunctionCallExpression(finfoGetField,
new MutableObject<>(new VariableReferenceExpression(pushUnnest.getVariable())), new MutableObject<>(
new ConstantExpression(new AsterixConstantValue(new AString(BADConstants.BrokerEndPoint)))));
AssignOperator assignBrokerEndPoint =
new AssignOperator(brokerEndpointVar, new MutableObject<>(getBrokerEndPoint));
assignBrokerEndPoint.getInputs().addAll(payLoadAssign.getInputs());
payLoadAssign.getInputs().clear();
payLoadAssign.getInputs().add(new MutableObject<>(assignBrokerEndPoint));
context.computeAndSetTypeEnvironmentForOperator(assignBrokerEndPoint);
context.computeAndSetTypeEnvironmentForOperator(payLoadAssign);
context.computeAndSetTypeEnvironmentForOperator(extensionOp);
return extensionOp;
}
private DelegateOperator createNotifyBrokerPullPlan(LogicalVariable brokerEndpointVar,
LogicalVariable sendVar, LogicalVariable channelExecutionVar, IOptimizationContext context,
ILogicalOperator eOp, String channelDataverse, String channelName)
throws AlgebricksException {
//Create the Distinct Op
ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<>();
VariableReferenceExpression vExpr = new VariableReferenceExpression(sendVar);
expressions.add(new MutableObject<>(vExpr));
DistinctOperator distinctOp = new DistinctOperator(expressions);
List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<>();
List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByDecorList = new ArrayList<>();
List<ILogicalPlan> nestedPlans = new ArrayList<>();
//Create GroupBy operator
GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, nestedPlans);
groupbyOp.addGbyExpression(null, new VariableReferenceExpression(brokerEndpointVar));
groupbyOp.addGbyExpression(null, new VariableReferenceExpression(channelExecutionVar));
//Set the distinct as input
groupbyOp.getInputs().add(new MutableObject<>(distinctOp));
//create nested plan for subscription ids in group by
NestedTupleSourceOperator nestedTupleSourceOp = new NestedTupleSourceOperator(new MutableObject<>(groupbyOp));
LogicalVariable sendListVar = context.newVar();
List<LogicalVariable> aggVars = new ArrayList<>();
aggVars.add(sendListVar);
AggregateFunctionCallExpression funAgg =
BuiltinFunctions.makeAggregateFunctionExpression(BuiltinFunctions.LISTIFY, new ArrayList<>());
funAgg.getArguments().add(new MutableObject<>(new VariableReferenceExpression(sendVar)));
List<Mutable<ILogicalExpression>> aggExpressions = new ArrayList<>();
aggExpressions.add(new MutableObject<>(funAgg));
AggregateOperator listifyOp = new AggregateOperator(aggVars, aggExpressions);
listifyOp.getInputs().add(new MutableObject<>(nestedTupleSourceOp));
//add nested plans
nestedPlans.add(new ALogicalPlanImpl(new MutableObject<>(listifyOp)));
//Create the NotifyBrokerOperator
DelegateOperator extensionOp = createBrokerOp(brokerEndpointVar, sendListVar, channelExecutionVar,
channelDataverse, channelName, false, null);
//Set the input for the distinct as the old top
extensionOp.getInputs().add(new MutableObject<>(groupbyOp));
distinctOp.getInputs().add(new MutableObject<>(eOp));
//compute environment bottom up
context.computeAndSetTypeEnvironmentForOperator(distinctOp);
context.computeAndSetTypeEnvironmentForOperator(groupbyOp);
context.computeAndSetTypeEnvironmentForOperator(nestedTupleSourceOp);
context.computeAndSetTypeEnvironmentForOperator(listifyOp);
context.computeAndSetTypeEnvironmentForOperator(extensionOp);
return extensionOp;
}
/**
* Find Specific Operators within the plan
*
* @param op
* The operator to begin the search at
* @param searchTag
* The type of operator to find
* @param subscriptionsName
* When searching for a scan, pass the name of the dataset
* @param subscriptionType
* When searching for a scan, pass the type of the dataset
* @return
*/
private AbstractLogicalOperator findOp(AbstractLogicalOperator op, LogicalOperatorTag searchTag,
String subscriptionsName, String subscriptionType) {
if (!op.hasInputs()) {
return null;
}
for (Mutable<ILogicalOperator> subOp : op.getInputs()) {
if (subOp.getValue().getOperatorTag() != searchTag) {
AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(), searchTag,
subscriptionsName, subscriptionType);
if (nestedOp != null) {
return nestedOp;
}
} else {
if (searchTag == LogicalOperatorTag.SUBPLAN) {
if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
ScalarFunctionCallExpression resultCreation =
(ScalarFunctionCallExpression) ((AssignOperator) op).getExpressions().get(0).getValue();
resultCreation.getArguments().remove(resultCreation.getArguments().size() - 1);
resultCreation.getArguments().remove(resultCreation.getArguments().size() - 1);
return (AbstractLogicalOperator) subOp.getValue();
}
} else if (searchTag == LogicalOperatorTag.PROJECT) {
return (AbstractLogicalOperator) subOp.getValue();
} else if (searchTag == LogicalOperatorTag.DATASOURCESCAN) {
if (isSubscriptionsScan(
(AbstractLogicalOperator) subOp.getValue(), subscriptionsName, subscriptionType)) {
return (AbstractLogicalOperator) subOp.getValue();
} else {
AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(), searchTag,
subscriptionsName, subscriptionType);
if (nestedOp != null) {
return nestedOp;
}
}
} else if (searchTag == LogicalOperatorTag.DELEGATE_OPERATOR) {
DelegateOperator dOp = (DelegateOperator) subOp.getValue();
if (dOp.getDelegate() instanceof CommitOperator) {
return (AbstractLogicalOperator) subOp.getValue();
}
}
}
}
return null;
}
private boolean isSubscriptionsScan(AbstractLogicalOperator op, String subscriptionsName, String subscriptionType) {
if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) {
DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
if (dds.getDataset().getItemTypeDataverseName().equals("Metadata")
&& dds.getDataset().getItemTypeName().equals(subscriptionType)) {
if (subscriptionsName.equals("") || dds.getDataset().getDatasetName().equals(subscriptionsName)) {
return true;
}
}
}
return false;
}
}