Delay result type evaluation until creating physical operator
Change-Id: Iabc5288b6cc5d4cdfa2ffdaca27e8caa813dc6d6
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 9ead7f0..258aaf1 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -31,7 +31,6 @@
import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.constants.AsterixConstantValue;
import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.asterix.om.types.IAType;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -44,7 +43,6 @@
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
@@ -182,10 +180,9 @@
}
private DelegateOperator createBrokerOp(LogicalVariable brokerEndpointVar, LogicalVariable sendVar,
- LogicalVariable channelExecutionVar, String channelDataverse, String channelName, boolean push,
- IAType resultType) {
+ LogicalVariable channelExecutionVar, String channelDataverse, String channelName, boolean push) {
NotifyBrokerOperator notifyBrokerOp =
- new NotifyBrokerOperator(brokerEndpointVar, sendVar, channelExecutionVar, push, resultType);
+ new NotifyBrokerOperator(brokerEndpointVar, sendVar, channelExecutionVar, push);
EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, channelDataverse, channelName);
NotifyBrokerPOperator notifyBrokerPOp = new NotifyBrokerPOperator(activeId);
notifyBrokerOp.setPhysicalOperator(notifyBrokerPOp);
@@ -203,12 +200,10 @@
while (assign.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
assign = (AbstractLogicalOperator) assign.getInputs().get(0).getValue();
}
- IVariableTypeEnvironment env = assign.computeOutputTypeEnvironment(context);
- IAType resultType = (IAType) env.getVarType(sendVar);
//Create the NotifyBrokerOperator
DelegateOperator extensionOp = createBrokerOp(brokerEndpointVar, sendVar, channelExecutionVar, channelDataverse,
- channelName, true, resultType);
+ channelName, true);
extensionOp.getInputs().add(new MutableObject<>(eOp));
context.computeAndSetTypeEnvironmentForOperator(extensionOp);
@@ -260,7 +255,7 @@
//Create the NotifyBrokerOperator
DelegateOperator extensionOp = createBrokerOp(brokerEndpointVar, sendListVar, channelExecutionVar,
- channelDataverse, channelName, false, null);
+ channelDataverse, channelName, false);
//Set the input for the distinct as the old top
extensionOp.getInputs().add(new MutableObject<>(groupbyOp));
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
index df0f0f4..5fc9b22 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
@@ -20,7 +20,6 @@
import java.util.Collection;
-import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDelegatedLogicalOperator;
@@ -35,15 +34,13 @@
private final LogicalVariable channelExecutionVar;
private final LogicalVariable pushListVar;
private final boolean push;
- private final IAType recordType;
public NotifyBrokerOperator(LogicalVariable brokerEndpointVar, LogicalVariable pushListVar,
- LogicalVariable resultSetVar, boolean push, IAType recordType) {
+ LogicalVariable resultSetVar, boolean push) {
this.brokerEndpointVar = brokerEndpointVar;
this.channelExecutionVar = resultSetVar;
this.pushListVar = pushListVar;
this.push = push;
- this.recordType = recordType;
}
public LogicalVariable getPushListVar() {
@@ -58,10 +55,6 @@
return channelExecutionVar;
}
- public IAType getRecordType() {
- return recordType;
- }
-
public boolean getPush() {
return push;
}
@@ -79,7 +72,7 @@
@Override
public IOperatorDelegate newInstance() {
- return new NotifyBrokerOperator(brokerEndpointVar, pushListVar, channelExecutionVar, push, recordType);
+ return new NotifyBrokerOperator(brokerEndpointVar, pushListVar, channelExecutionVar, push);
}
@Override
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
index b9cfbfd..264a994 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
@@ -27,6 +27,7 @@
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
@@ -78,7 +79,10 @@
LogicalVariable pushListVar = ((NotifyBrokerOperator) notify.getDelegate()).getPushListVar();
LogicalVariable brokerVar = ((NotifyBrokerOperator) notify.getDelegate()).getBrokerEndpointVariable();
LogicalVariable executionVar = ((NotifyBrokerOperator) notify.getDelegate()).getChannelExecutionVariable();
- IAType recordType = ((NotifyBrokerOperator) notify.getDelegate()).getRecordType();
+
+ IVariableTypeEnvironment env = context.getTypeEnvironment(op.getInputs().get(0).getValue());
+ IAType recordType = (IAType) env.getVarType(pushListVar);
+
boolean push = ((NotifyBrokerOperator) notify.getDelegate()).getPush();
int brokerColumn = inputSchemas[0].findVariable(brokerVar);