Delay result type evaluation until creating physical operator

Change-Id: Iabc5288b6cc5d4cdfa2ffdaca27e8caa813dc6d6
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 9ead7f0..258aaf1 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -31,7 +31,6 @@
 import org.apache.asterix.om.base.AString;
 import org.apache.asterix.om.constants.AsterixConstantValue;
 import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.asterix.om.types.IAType;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -44,7 +43,6 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
@@ -182,10 +180,9 @@
     }
 
     private DelegateOperator createBrokerOp(LogicalVariable brokerEndpointVar, LogicalVariable sendVar,
-            LogicalVariable channelExecutionVar, String channelDataverse, String channelName, boolean push,
-            IAType resultType) {
+            LogicalVariable channelExecutionVar, String channelDataverse, String channelName, boolean push) {
         NotifyBrokerOperator notifyBrokerOp =
-                new NotifyBrokerOperator(brokerEndpointVar, sendVar, channelExecutionVar, push, resultType);
+                new NotifyBrokerOperator(brokerEndpointVar, sendVar, channelExecutionVar, push);
         EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, channelDataverse, channelName);
         NotifyBrokerPOperator notifyBrokerPOp = new NotifyBrokerPOperator(activeId);
         notifyBrokerOp.setPhysicalOperator(notifyBrokerPOp);
@@ -203,12 +200,10 @@
         while (assign.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
             assign = (AbstractLogicalOperator) assign.getInputs().get(0).getValue();
         }
-        IVariableTypeEnvironment env = assign.computeOutputTypeEnvironment(context);
-        IAType resultType = (IAType) env.getVarType(sendVar);
 
         //Create the NotifyBrokerOperator
         DelegateOperator extensionOp = createBrokerOp(brokerEndpointVar, sendVar, channelExecutionVar, channelDataverse,
-                channelName, true, resultType);
+                channelName, true);
 
         extensionOp.getInputs().add(new MutableObject<>(eOp));
         context.computeAndSetTypeEnvironmentForOperator(extensionOp);
@@ -260,7 +255,7 @@
 
         //Create the NotifyBrokerOperator
         DelegateOperator extensionOp = createBrokerOp(brokerEndpointVar, sendListVar, channelExecutionVar,
-                channelDataverse, channelName, false, null);
+                channelDataverse, channelName, false);
 
         //Set the input for the distinct as the old top
         extensionOp.getInputs().add(new MutableObject<>(groupbyOp));
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
index df0f0f4..5fc9b22 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
@@ -20,7 +20,6 @@
 
 import java.util.Collection;
 
-import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDelegatedLogicalOperator;
@@ -35,15 +34,13 @@
     private final LogicalVariable channelExecutionVar;
     private final LogicalVariable pushListVar;
     private final boolean push;
-    private final IAType recordType;
 
     public NotifyBrokerOperator(LogicalVariable brokerEndpointVar, LogicalVariable pushListVar,
-            LogicalVariable resultSetVar, boolean push, IAType recordType) {
+            LogicalVariable resultSetVar, boolean push) {
         this.brokerEndpointVar = brokerEndpointVar;
         this.channelExecutionVar = resultSetVar;
         this.pushListVar = pushListVar;
         this.push = push;
-        this.recordType = recordType;
     }
 
     public LogicalVariable getPushListVar() {
@@ -58,10 +55,6 @@
         return channelExecutionVar;
     }
 
-    public IAType getRecordType() {
-        return recordType;
-    }
-
     public boolean getPush() {
         return push;
     }
@@ -79,7 +72,7 @@
 
     @Override
     public IOperatorDelegate newInstance() {
-        return new NotifyBrokerOperator(brokerEndpointVar, pushListVar, channelExecutionVar, push, recordType);
+        return new NotifyBrokerOperator(brokerEndpointVar, pushListVar, channelExecutionVar, push);
     }
 
     @Override
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
index b9cfbfd..264a994 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
@@ -27,6 +27,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
@@ -78,7 +79,10 @@
         LogicalVariable pushListVar = ((NotifyBrokerOperator) notify.getDelegate()).getPushListVar();
         LogicalVariable brokerVar = ((NotifyBrokerOperator) notify.getDelegate()).getBrokerEndpointVariable();
         LogicalVariable executionVar = ((NotifyBrokerOperator) notify.getDelegate()).getChannelExecutionVariable();
-        IAType recordType = ((NotifyBrokerOperator) notify.getDelegate()).getRecordType();
+
+        IVariableTypeEnvironment env = context.getTypeEnvironment(op.getInputs().get(0).getValue());
+        IAType recordType = (IAType) env.getVarType(pushListVar);
+
         boolean push = ((NotifyBrokerOperator) notify.getDelegate()).getPush();
 
         int brokerColumn = inputSchemas[0].findVariable(brokerVar);