Samza-2631: Samza-sql: Revert structured data support (#1472)

diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index c4138ea..b7cee00 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -201,12 +201,10 @@
             .collect(Collectors.toList());
         return avroList;
       case MAP:
-        // If you ask why not using String and that is because some strings are Wrapped into org.apache.avro.util.Utf8
-        // TODO looking at the Utf8 code base it is not immutable, having it as a key is calling for trouble!
-        final Map<Object, Object> outputMap = new HashMap<>();
-        ((Map<Object, Object>) relObj).forEach((key, aValue) -> outputMap.put(key,
-            convertToAvroObject(aValue, getNonNullUnionSchema(schema).getValueType())));
-        return outputMap;
+        return ((Map<String, ?>) relObj).entrySet()
+            .stream()
+            .collect(Collectors.toMap(Map.Entry::getKey,
+              e -> convertToAvroObject(e.getValue(), getNonNullUnionSchema(schema).getValueType())));
       case UNION:
         for (Schema unionSchema : schema.getTypes()) {
           if (isSchemaCompatibleWithRelObj(relObj, unionSchema)) {
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
index d837e03..5dddf15 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
@@ -148,10 +148,8 @@
   }
 
   private SqlFieldSchema getSqlTypeFromUnionTypes(List<Schema> types, boolean isNullable, boolean isOptional) {
-    if (types.size() == 1) {
-      return convertField(types.get(0), true, true);
-    } else if (types.size() == 2) {
-      // Typically a nullable field's schema is configured as an union of Null and a Type.
+    // Typically a nullable field's schema is configured as an union of Null and a Type.
+    if (types.size() == 2) {
       if (types.get(0).getType() == Schema.Type.NULL) {
         return convertField(types.get(1), true, true);
       } else if (types.get(1).getType() == Schema.Type.NULL) {
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/fn/GetNestedFieldUdf.java b/samza-sql/src/main/java/org/apache/samza/sql/fn/GetNestedFieldUdf.java
new file mode 100644
index 0000000..4ef2a11
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/fn/GetNestedFieldUdf.java
@@ -0,0 +1,42 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.fn;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+import org.apache.samza.sql.schema.SamzaSqlFieldType;
+import org.apache.samza.sql.udfs.SamzaSqlUdf;
+import org.apache.samza.sql.udfs.SamzaSqlUdfMethod;
+import org.apache.samza.sql.udfs.ScalarUdf;
+
+
+@SamzaSqlUdf(name = "GetNestedField", description = "UDF that extracts a field value from a nested SamzaSqlRelRecord")
+public class GetNestedFieldUdf implements ScalarUdf {
+  @Override
+  public void init(Config udfConfig, Context context) {
+  }
+
+  @SamzaSqlUdfMethod(params = {SamzaSqlFieldType.ANY, SamzaSqlFieldType.STRING},
+      returns = SamzaSqlFieldType.ANY)
+  public Object execute(Object currentFieldOrValue, String fieldName) {
+    GetSqlFieldUdf udf = new GetSqlFieldUdf();
+    return udf.getSqlField(currentFieldOrValue, fieldName);
+  }
+}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java
index 60794ef..ccbee6a 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java
@@ -98,11 +98,11 @@
         if (parsedSqlArgType.getSqlTypeName() == SqlTypeName.CHAR && udfArgumentAsSqlType == SqlTypeName.VARCHAR) {
           return true;
         } else if (!Objects.equals(parsedSqlArgType.getSqlTypeName(), udfArgumentAsSqlType)
-            && !ANY_SQL_TYPE_NAMES.contains(parsedSqlArgType.getSqlTypeName()) && hasOneUdfMethod(udfMetadata)) {
+                && !ANY_SQL_TYPE_NAMES.contains(parsedSqlArgType.getSqlTypeName()) && hasOneUdfMethod(udfMetadata)) {
           // 3(b). Throw up and fail on mismatch between the SamzaSqlType and CalciteType for any argument.
-          String msg = String.format(
-              "Type mismatch in udf class: %s at argument index: %d." + "Expected type: %s, actual type: %s.",
-              udfMetadata.getName(), udfArgumentIndex, udfArgumentAsSqlType, parsedSqlArgType.getSqlTypeName());
+          String msg = String.format("Type mismatch in udf class: %s at argument index: %d." +
+                          "Expected type: %s, actual type: %s.", udfMetadata.getName(),
+                  udfArgumentIndex, parsedSqlArgType.getSqlTypeName(), udfArgumentAsSqlType);
           LOG.error(msg);
           throw new SamzaSqlValidatorException(msg);
         }
@@ -159,9 +159,8 @@
   static SqlTypeName toCalciteSqlType(SamzaSqlFieldType samzaSqlFieldType) {
     switch (samzaSqlFieldType) {
       case ANY:
-        return SqlTypeName.ANY;
       case ROW:
-        return SqlTypeName.ROW;
+        return SqlTypeName.ANY;
       case MAP:
         return SqlTypeName.MAP;
       case ARRAY:
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index dc37753..767df43 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -20,13 +20,15 @@
 package org.apache.samza.sql.planner;
 
 import com.google.common.collect.ImmutableList;
+import java.sql.Connection;
+import java.sql.DriverManager;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.calcite.config.Lex;
-import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.jdbc.CalciteConnection;
 import org.apache.calcite.plan.ConventionTraitDef;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptUtil;
@@ -60,10 +62,10 @@
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.RelSchemaProvider;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
-import org.apache.samza.sql.interfaces.UdfMetadata;
 import org.apache.samza.sql.schema.SamzaSqlFieldType;
 import org.apache.samza.sql.schema.SqlFieldSchema;
 import org.apache.samza.sql.schema.SqlSchema;
+import org.apache.samza.sql.interfaces.UdfMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -120,42 +122,56 @@
   }
 
   private Planner getPlanner() {
-    Planner planner;
-    SchemaPlus rootSchema = CalciteSchema.createRootSchema(true, false).plus();
-    registerSourceSchemas(rootSchema);
+    Planner planner = null;
+    try {
+      Connection connection = DriverManager.getConnection("jdbc:calcite:");
+      CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
+      SchemaPlus rootSchema = calciteConnection.getRootSchema();
+      registerSourceSchemas(rootSchema);
 
-    List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions =
-        udfMetadata.stream().map(SamzaSqlScalarFunctionImpl::new).collect(Collectors.toList());
+      List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions = udfMetadata.stream()
+          .map(SamzaSqlScalarFunctionImpl::new)
+          .collect(Collectors.toList());
 
-    final List<RelTraitDef> traitDefs = new ArrayList<>();
+      final List<RelTraitDef> traitDefs = new ArrayList<>();
 
-    traitDefs.add(ConventionTraitDef.INSTANCE);
-    traitDefs.add(RelCollationTraitDef.INSTANCE);
+      traitDefs.add(ConventionTraitDef.INSTANCE);
+      traitDefs.add(RelCollationTraitDef.INSTANCE);
 
-    List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
-    sqlOperatorTables.add(new SamzaSqlOperatorTable());
-    sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));
+      List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+      sqlOperatorTables.add(new SamzaSqlOperatorTable());
+      sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));
 
-    // TODO: Introduce a pluggable rule factory.
-    List<RelOptRule> rules = ImmutableList.of(FilterProjectTransposeRule.INSTANCE, ProjectMergeRule.INSTANCE,
-        new SamzaSqlFilterRemoteJoinRule.SamzaSqlFilterIntoRemoteJoinRule(true, RelFactories.LOGICAL_BUILDER,
-            systemStreamConfigBySource));
+      // TODO: Introduce a pluggable rule factory.
+      List<RelOptRule> rules = ImmutableList.of(
+          FilterProjectTransposeRule.INSTANCE,
+          ProjectMergeRule.INSTANCE,
+          new SamzaSqlFilterRemoteJoinRule.SamzaSqlFilterIntoRemoteJoinRule(true, RelFactories.LOGICAL_BUILDER,
+          systemStreamConfigBySource));
 
-    // Using lenient so that !=,%,- are allowed.
-    FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
-        .parserConfig(SqlParser.configBuilder()
-            .setLex(Lex.JAVA)
-            .setConformance(SqlConformanceEnum.LENIENT)
-            .setCaseSensitive(false) // Make Udfs case insensitive
-            .build())
-        .defaultSchema(rootSchema)
-        .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
-        .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
-        .traitDefs(traitDefs)
-        .programs(Programs.hep(rules, true, DefaultRelMetadataProvider.INSTANCE))
-        .build();
-    planner = Frameworks.getPlanner(frameworkConfig);
-    return planner;
+      // Using lenient so that !=,%,- are allowed.
+      FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
+          .parserConfig(SqlParser.configBuilder()
+              .setLex(Lex.JAVA)
+              .setConformance(SqlConformanceEnum.LENIENT)
+              .setCaseSensitive(false) // Make Udfs case insensitive
+              .build())
+          .defaultSchema(rootSchema)
+          .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
+          .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
+          .traitDefs(traitDefs)
+          .programs(Programs.hep(rules, true, DefaultRelMetadataProvider.INSTANCE))
+          .build();
+      planner = Frameworks.getPlanner(frameworkConfig);
+      return planner;
+    } catch (Exception e) {
+      String errorMsg = "Failed to create planner.";
+      LOG.error(errorMsg, e);
+      if (planner != null) {
+        planner.close();
+      }
+      throw new SamzaException(errorMsg, e);
+    }
   }
 
   private RelRoot optimize(Planner planner, RelRoot relRoot) {
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
index fcc289c..c3735a4 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
@@ -21,13 +21,11 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.rel.type.RelRecordType;
-import org.apache.calcite.rel.type.StructKind;
 import org.apache.calcite.sql.type.ArraySqlType;
 import org.apache.calcite.sql.type.MapSqlType;
 import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
@@ -96,11 +94,6 @@
       case INT64:
         return createTypeWithNullability(createSqlType(SqlTypeName.BIGINT), true);
       case ROW:
-        final RelDataType rowType = convertToRelSchema(fieldSchema.getRowSchema());
-        /* Using Fully Qualified names to ensure that at the last project the row is fully reconstructed */
-        return createTypeWithNullability(createStructType(StructKind.FULLY_QUALIFIED,
-            rowType.getFieldList().stream().map(RelDataTypeField::getType).collect(Collectors.toList()),
-            rowType.getFieldNames()), true);
       case ANY:
         // TODO Calcite execution engine doesn't support record type yet.
         return createTypeWithNullability(createSqlType(SqlTypeName.ANY), true);
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
index 6b8e8ba..3098af7 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
@@ -35,7 +35,6 @@
 import org.apache.calcite.sql.fun.SqlRowOperator;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.util.ReflectiveSqlOperatorTable;
-import org.apache.samza.sql.udf.GetNestedField;
 
 
 /**
@@ -144,7 +143,6 @@
   public static final SqlFunction TUMBLE = SqlStdOperatorTable.TUMBLE;
   public static final SqlFunction TUMBLE_END = SqlStdOperatorTable.TUMBLE_END;
   public static final SqlFunction TUMBLE_START = SqlStdOperatorTable.TUMBLE_START;
-  public static final SqlFunction GET_NESTED_FIELD_OP = GetNestedField.INSTANCE;
 
   public SamzaSqlOperatorTable() {
     init();
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index b1b9001..86af594 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -61,7 +61,6 @@
    * NOTE: This constructor is called from {@link ApplicationRunners} through reflection.
    * Please refrain from updating the signature or removing this constructor unless the caller has changed the interface.
    */
-  @SuppressWarnings("unused") /* used via reflection */
   public SamzaSqlApplicationRunner(SamzaApplication app, Config config) {
     this(app, false, config);
   }
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
index b42569c..604f061 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
@@ -80,7 +80,6 @@
       this.context = context;
       this.translatorContext = ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
       this.filter = (LogicalFilter) this.translatorContext.getRelNode(filterId);
-      LOG.info("Compiling Operator {}", filter.getDigest());
       this.expr = this.translatorContext.getExpressionCompiler().compile(filter.getInputs(), Collections.singletonList(filter.getCondition()));
       ContainerContext containerContext = context.getContainerContext();
       metricsRegistry = containerContext.getContainerMetricsRegistry();
@@ -97,10 +96,9 @@
     public boolean apply(SamzaSqlRelMessage message) {
       long startProcessing = System.nanoTime();
       Object[] result = new Object[1];
-      Object[] inputRow = ProjectTranslator.convertToJavaRow(message.getSamzaSqlRelRecord());
       try {
         expr.execute(translatorContext.getExecutionContext(), context, translatorContext.getDataContext(),
-            inputRow, result);
+            message.getSamzaSqlRelRecord().getFieldValues().toArray(), result);
       } catch (Exception e) {
         String errMsg = String.format("Handling the following rel message ran into an error. %s", message);
         LOG.error(errMsg, e);
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
index c51ff25..1a0a7ec 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
@@ -74,7 +74,7 @@
   }
 
   String getSourceName() {
-    return SqlIOConfig.getSourceFromSourceParts(getTableScan(relNode).getTable().getQualifiedName());
+    return SqlIOConfig.getSourceFromSourceParts(relNode.getTable().getQualifiedName());
   }
 
   boolean isPosOnRight() {
@@ -109,12 +109,4 @@
     }
   }
 
-  private static TableScan getTableScan(RelNode relNode) {
-    if (relNode instanceof TableScan) {
-      return (TableScan) relNode;
-    }
-    // we deal with Single inputs filter/project
-    assert relNode.getInputs().size() == 1;
-    return getTableScan(relNode.getInput(0));
-  }
 }
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 26d9fa0..02c434d 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -40,7 +40,6 @@
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexShuttle;
-import org.apache.calcite.rex.RexSlot;
 import org.apache.calcite.sql.SqlExplainFormat;
 import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.sql.SqlKind;
@@ -80,7 +79,7 @@
 class JoinTranslator {
 
   private static final Logger log = LoggerFactory.getLogger(JoinTranslator.class);
-  private final String logicalOpId;
+  private String logicalOpId;
   private final String intermediateStreamPrefix;
   private final int queryId;
   private final TranslatorInputMetricsMapFunction inputMetricsMF;
@@ -166,16 +165,8 @@
 
     if (tableNode.isRemoteTable()) {
       String remoteTableName = tableNode.getSourceName();
-      MessageStream<SamzaSqlRelMessage> operatorStack = context.getMessageStream(tableNode.getRelNode().getId());
-      final  StreamTableJoinFunction<Object, SamzaSqlRelMessage, KV, SamzaSqlRelMessage> joinFn;
-      if (operatorStack != null && operatorStack instanceof MessageStreamCollector) {
-        joinFn = new SamzaSqlRemoteTableJoinFunction(context.getMsgConverter(remoteTableName),
-            context.getTableKeyConverter(remoteTableName), streamNode, tableNode, join.getJoinType(), queryId,
-            (MessageStreamCollector) operatorStack);
-      } else {
-        joinFn = new SamzaSqlRemoteTableJoinFunction(context.getMsgConverter(remoteTableName),
-            context.getTableKeyConverter(remoteTableName), streamNode, tableNode, join.getJoinType(), queryId);
-      }
+      StreamTableJoinFunction joinFn = new SamzaSqlRemoteTableJoinFunction(context.getMsgConverter(remoteTableName),
+          context.getTableKeyConverter(remoteTableName), streamNode, tableNode, join.getJoinType(), queryId);
 
       return inputStream
           .map(inputMetricsMF)
@@ -184,11 +175,7 @@
 
     // Join with the local table
 
-    StreamTableJoinFunction<SamzaSqlRelRecord,
-        SamzaSqlRelMessage,
-        KV<SamzaSqlRelRecord, SamzaSqlRelMessage>,
-        SamzaSqlRelMessage>
-        joinFn = new SamzaSqlLocalTableJoinFunction(streamNode, tableNode, join.getJoinType());
+    StreamTableJoinFunction joinFn = new SamzaSqlLocalTableJoinFunction(streamNode, tableNode, join.getJoinType());
 
     SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
         (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
@@ -283,8 +270,8 @@
         isTablePosOnRight ? join.getRowType().getFieldCount() : join.getLeft().getRowType().getFieldCount();
 
     List<Integer> tableRefsIdx = refCollector.stream()
-        .map(RexSlot::getIndex)
-        .filter(x -> (tableStartIndex <= x) && (x < tableEndIndex)) // collect all the refs form table side
+        .map(x -> x.getIndex())
+        .filter(x -> tableStartIndex <= x && x < tableEndIndex) // collect all the refs form table side
         .map(x -> x - tableStartIndex) // re-adjust the offset
         .sorted()
         .collect(Collectors.toList()); // we have a list with all the input from table side with 0 based index.
@@ -357,12 +344,11 @@
   private void validateJoinKeyType(RexInputRef ref) {
     SqlTypeName sqlTypeName = ref.getType().getSqlTypeName();
 
-    // Primitive types and Row (for the record key) and ANY for other fields like __key__
-    // TODO this need to be pulled to a common class/place that have all the supported types
+    // Primitive types and ANY (for the record key) are supported in the key
     if (sqlTypeName != SqlTypeName.BOOLEAN && sqlTypeName != SqlTypeName.TINYINT && sqlTypeName != SqlTypeName.SMALLINT
         && sqlTypeName != SqlTypeName.INTEGER && sqlTypeName != SqlTypeName.CHAR && sqlTypeName != SqlTypeName.BIGINT
         && sqlTypeName != SqlTypeName.VARCHAR && sqlTypeName != SqlTypeName.DOUBLE && sqlTypeName != SqlTypeName.FLOAT
-        && sqlTypeName != SqlTypeName.ANY && sqlTypeName != SqlTypeName.OTHER && sqlTypeName != SqlTypeName.ROW) {
+        && sqlTypeName != SqlTypeName.ANY && sqlTypeName != SqlTypeName.OTHER) {
       log.error("Unsupported key type " + sqlTypeName + " used in join condition.");
       throw new SamzaException("Unsupported key type used in join condition.");
     }
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/MessageStreamCollector.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/MessageStreamCollector.java
deleted file mode 100644
index 0ddc136..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/MessageStreamCollector.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.translator;
-
-import java.io.Closeable;
-import java.io.Serializable;
-import java.time.Duration;
-import java.util.ArrayDeque;
-import java.util.Collection;
-import java.util.Deque;
-import java.util.function.Function;
-import org.apache.samza.context.Context;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.functions.AsyncFlatMapFunction;
-import org.apache.samza.operators.functions.FilterFunction;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.functions.StreamTableJoinFunction;
-import org.apache.samza.operators.windows.Window;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.apache.samza.table.Table;
-
-
-/**
- * Collector of Map and Filter Samza Functions to collect call stack on the top of Remote table.
- * This Collector will be used by Join operator and trigger it when applying the join function post lookup.
- *
- * Note that this is needed because the Remote Table can not expose a proper {@code MessageStream}.
- * It is a work around to minimize the amount of code changes of the current Query Translator {@link org.apache.samza.sql.translator.QueryTranslator},
- * But in an ideal world, we should use Calcite planner in conventional way we can combine function when via translation of RelNodes.
- */
-class MessageStreamCollector implements MessageStream<SamzaSqlRelMessage>, Serializable, Closeable {
-
-  /**
-   * Queue First in First to be Fired order of the operators on the top of Remote Table Scan.
-   */
-  private final Deque<MapFunction<? super SamzaSqlRelMessage, ? extends SamzaSqlRelMessage>> mapFnCallQueue =
-      new ArrayDeque<>();
-
-  /**
-   * Function to chain the call to close from each operator.
-   */
-  private transient Function<Void, Void> closeFn = aVoid -> null;
-
-  @Override
-  public <OM> MessageStream<OM> map(MapFunction<? super SamzaSqlRelMessage, ? extends OM> mapFn) {
-    mapFnCallQueue.offer((MapFunction<? super SamzaSqlRelMessage, ? extends SamzaSqlRelMessage>) mapFn);
-    return (MessageStream<OM>) this;
-  }
-
-  @Override
-  public MessageStream<SamzaSqlRelMessage> filter(FilterFunction<? super SamzaSqlRelMessage> filterFn) {
-    mapFnCallQueue.offer(new FilterMapAdapter(filterFn));
-    return this;
-  }
-
-  /**
-   * This function is called by the join operator on run time to apply filter and projects post join lookup.
-   *
-   * @param context Samza Execution Context
-   * @return {code null} case filter reject the row, Samza Relational Record as it goes via Projects.
-   */
-  Function<SamzaSqlRelMessage, SamzaSqlRelMessage> getFunction(Context context) {
-    Function<SamzaSqlRelMessage, SamzaSqlRelMessage> tailFn = null;
-    Function<Void, Void> intFn = aVoid -> null; // Projects and Filters both need to be initialized.
-    closeFn = aVoid -> null;
-    // At this point we have a the queue of operator, where first in is the first operator on top of TableScan.
-    while (!mapFnCallQueue.isEmpty()) {
-      MapFunction<? super SamzaSqlRelMessage, ? extends SamzaSqlRelMessage> f = mapFnCallQueue.poll();
-      intFn = intFn.andThen((aVoid) -> {
-        f.init(context);
-        return null;
-      });
-      closeFn.andThen((aVoid) -> {
-        f.close();
-        return null;
-      });
-
-      Function<SamzaSqlRelMessage, SamzaSqlRelMessage> current = x -> x == null ? null : f.apply(x);
-      if (tailFn == null) {
-        tailFn = current;
-      } else {
-        tailFn = current.compose(tailFn);
-      }
-    }
-    // TODO TBH not sure about this need to check if Samza Framework will be okay with late init call.
-    intFn.apply(null); // Init call has to happen here.
-    return tailFn == null ? Function.identity() : tailFn;
-  }
-
-  /**
-   * Filter adapter is used to compose filters with {@code MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage>}
-   * Filter function will return {@code null} when input is {@code null} or filter condition reject current row.
-   */
-  private static class FilterMapAdapter implements MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage> {
-    private final FilterFunction<? super SamzaSqlRelMessage> filterFn;
-
-    private FilterMapAdapter(FilterFunction<? super SamzaSqlRelMessage> filterFn) {
-      this.filterFn = filterFn;
-    }
-
-    @Override
-    public SamzaSqlRelMessage apply(SamzaSqlRelMessage message) {
-      if (message != null && filterFn.apply(message)) {
-        return message;
-      }
-      // null on case no match
-      return null;
-    }
-
-    @Override
-    public void close() {
-      filterFn.close();
-    }
-
-    @Override
-    public void init(Context context) {
-      filterFn.init(context);
-    }
-  }
-
-  @Override
-  public void close() {
-    if (closeFn != null) {
-      closeFn.apply(null);
-    }
-  }
-
-  @Override
-  public <OM> MessageStream<OM> flatMap(FlatMapFunction<? super SamzaSqlRelMessage, ? extends OM> flatMapFn) {
-    return null;
-  }
-
-  @Override
-  public <OM> MessageStream<OM> flatMapAsync(
-      AsyncFlatMapFunction<? super SamzaSqlRelMessage, ? extends OM> asyncFlatMapFn) {
-    return null;
-  }
-
-  @Override
-  public void sink(SinkFunction<? super SamzaSqlRelMessage> sinkFn) {
-    throw new IllegalStateException("Not valid state");
-  }
-
-  @Override
-  public MessageStream<SamzaSqlRelMessage> sendTo(OutputStream<SamzaSqlRelMessage> outputStream) {
-    throw new IllegalStateException("Not valid state");
-  }
-
-  @Override
-  public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<SamzaSqlRelMessage, K, WV> window, String id) {
-    throw new IllegalStateException("Not valid state");
-  }
-
-  @Override
-  public <K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream,
-      JoinFunction<? extends K, ? super SamzaSqlRelMessage, ? super OM, ? extends JM> joinFn, Serde<K> keySerde,
-      Serde<SamzaSqlRelMessage> messageSerde, Serde<OM> otherMessageSerde, Duration ttl, String id) {
-    throw new IllegalStateException("Not valid state");
-  }
-
-  @Override
-  public <K, R extends KV, JM> MessageStream<JM> join(Table<R> table,
-      StreamTableJoinFunction<? extends K, ? super SamzaSqlRelMessage, ? super R, ? extends JM> joinFn,
-      Object... args) {
-    throw new IllegalStateException("Not valid state");
-  }
-
-  @Override
-  public MessageStream<SamzaSqlRelMessage> merge(
-      Collection<? extends MessageStream<? extends SamzaSqlRelMessage>> otherStreams) {
-    throw new IllegalStateException("Not valid state");
-  }
-
-  @Override
-  public <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super SamzaSqlRelMessage, ? extends K> keyExtractor,
-      MapFunction<? super SamzaSqlRelMessage, ? extends V> valueExtractor, KVSerde<K, V> serde, String id) {
-    throw new IllegalStateException("Not valid state");
-  }
-
-  @Override
-  public <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table, Object... args) {
-    throw new IllegalStateException("Not valid state");
-  }
-
-  @Override
-  public MessageStream<SamzaSqlRelMessage> broadcast(Serde<SamzaSqlRelMessage> serde, String id) {
-    throw new IllegalStateException("Not valid state");
-  }
-}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index 79f58e8..1ebea5f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -19,14 +19,11 @@
 
 package org.apache.samza.sql.translator;
 
-import com.google.common.base.Preconditions;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import org.apache.calcite.DataContext;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
@@ -40,9 +37,7 @@
 import org.apache.samza.metrics.SamzaHistogram;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.apache.samza.sql.data.Expression;
-import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
 import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
@@ -54,7 +49,7 @@
  * Translator to translate the Project node in the relational graph to the corresponding StreamGraph
  * implementation.
  */
-public class ProjectTranslator {
+class ProjectTranslator {
 
   private static final Logger LOG = LoggerFactory.getLogger(ProjectTranslator.class);
   //private transient int messageIndex = 0;
@@ -65,122 +60,6 @@
   }
 
   /**
-   * Converts the resulting row from Calcite Expression Evaluator to SamzaRelRecord to be sent downstream.
-   *
-   * @param objects input objects to be converted
-   * @param rowType Calcite row type of the resulting row
-   * @return return a valid message Stream of type SamzaSqlRelRecord
-   */
-  public static SamzaSqlRelRecord buildSamzaRelRecord(Object[] objects, RelDataType rowType) {
-    Preconditions.checkNotNull(objects, "Input objects can not be null");
-    Preconditions.checkState(rowType.isStruct(), "Row Type has to be a Struct and got " + rowType.getSqlTypeName());
-    Preconditions.checkState(objects.length == rowType.getFieldCount(),
-        "Objects counts and type counts must match " + objects.length + " vs " + rowType.getFieldCount());
-    List<String> names = new ArrayList<>(rowType.getFieldNames());
-    List<Object> values = new ArrayList<>(rowType.getFieldCount());
-    for (int i = 0; i < objects.length; i++) {
-      Object val = objects[i];
-      if (val == null) {
-        values.add(null);
-        continue;
-      }
-      final RelDataType valueType = rowType.getFieldList().get(i).getType();
-      values.add(convertToSamzaSqlType(val, valueType));
-    }
-    return new SamzaSqlRelRecord(names, values);
-  }
-
-  /**
-   * Recursively converts a Primitive Java Object to valid Samza Rel Record field type.
-   *
-   * @param value input value to be converted
-   * @param dataType value type as derived by Calcite
-   * @return SamzaRelRecord or primitive SamzaRelRecord field.
-   *
-   */
-  private static Object convertToSamzaSqlType(Object value, RelDataType dataType) {
-    if (value == null) {
-      return null;
-    }
-    switch (dataType.getSqlTypeName()) {
-      case ROW:
-        List<String> names = new ArrayList<>(dataType.getFieldNames());
-        // Row Struct is represent as Object array in Calcite.
-        Object[] row = (Object[]) value;
-        List<Object> values = new ArrayList<>(row.length);
-        for (int i = 0; i < row.length; i++) {
-          values.add(convertToSamzaSqlType(row[i], dataType.getFieldList().get(i).getType()));
-        }
-        return new SamzaSqlRelRecord(names, values);
-      case MAP:
-        Map<Object, Object> objectMap = (Map<Object, Object>) value;
-        Map<Object, Object> resultMap = new HashMap<>();
-        final RelDataType valuesType = dataType.getValueType();
-        objectMap.forEach((key, v) -> resultMap.put(key, convertToSamzaSqlType(v, valuesType)));
-        return resultMap;
-      case ARRAY:
-        List<Object> objectList = (List<Object>) value;
-        final RelDataType elementsType = dataType.getComponentType();
-        return objectList.stream().map(e -> convertToSamzaSqlType(e, elementsType)).collect(Collectors.toList());
-      case BOOLEAN:
-      case BIGINT:
-      case BINARY:
-      case INTEGER:
-      case TINYINT:
-      case DOUBLE:
-      case FLOAT:
-      case REAL:
-      case VARCHAR:
-      case CHAR:
-      case VARBINARY:
-      case ANY:
-      case OTHER:
-        // today we treat everything else as Type Any or Other, this is not ideal.
-        // this will change when adding timestamps support or more complex non java primitive types.
-        // TODO in a better world we need to add type factory that can do the conversion between calcite and samza.
-        return value;
-      default:
-        // As of today we treat everything else as type ANY
-        throw new IllegalStateException("Unknown SQL type " + dataType.getSqlTypeName());
-    }
-  }
-
-  /**
-   * Converts the Samza Record to a Java Primitive Row format that's in convention with Calcite Enum operators.
-   *
-   * @param samzaSqlRelRecord input record.
-   * @return row of Java Primitive conform to org.apache.calcite.adapter.enumerable.JavaRowFormat#ARRAY
-   */
-  public static Object[] convertToJavaRow(SamzaSqlRelRecord samzaSqlRelRecord) {
-    if (samzaSqlRelRecord == null) {
-      return null;
-    }
-    Object[] inputRow = new Object[samzaSqlRelRecord.getFieldValues().size()];
-    for (int i = 0; i < inputRow.length; i++) {
-      inputRow[i] = asPrimitiveJavaRow(samzaSqlRelRecord.getFieldValues().get(i));
-    }
-    return inputRow;
-  }
-
-  private static Object asPrimitiveJavaRow(Object inputObject) {
-    if (inputObject == null) {
-      return null;
-    }
-    if (inputObject instanceof SamzaSqlRelRecord) {
-      return convertToJavaRow((SamzaSqlRelRecord) inputObject);
-    }
-    if (inputObject instanceof List) {
-      return ((List) inputObject).stream().map(e -> asPrimitiveJavaRow(e)).collect(Collectors.toList());
-    }
-    if (inputObject instanceof Map) {
-      Map<Object, Object> objectMap = new HashMap<>();
-      ((Map<Object, Object>) inputObject).forEach((k, v) -> objectMap.put(k, asPrimitiveJavaRow(v)));
-      return objectMap;
-    }
-    return inputObject;
-  }
-
-  /**
    * ProjectMapFunction implements MapFunction to map input SamzaSqlRelMessages, one at a time, to a new
    * SamzaSqlRelMessage which consists of the projected fields
    */
@@ -214,7 +93,6 @@
       this.translatorContext =
           ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
       this.project = (Project) this.translatorContext.getRelNode(projectId);
-      LOG.info("Compiling operator {} ", project.getDigest());
       this.expr = this.translatorContext.getExpressionCompiler().compile(project.getInputs(), project.getProjects());
       ContainerContext containerContext = context.getContainerContext();
       metricsRegistry = containerContext.getContainerMetricsRegistry();
@@ -235,19 +113,20 @@
       long arrivalTime = System.nanoTime();
       RelDataType type = project.getRowType();
       Object[] output = new Object[type.getFieldCount()];
-      Object[] inputRow = convertToJavaRow(message.getSamzaSqlRelRecord());
-      SamzaSqlExecutionContext execContext = translatorContext.getExecutionContext();
-      DataContext dataRootContext = translatorContext.getDataContext();
       try {
-        expr.execute(execContext, context, dataRootContext, inputRow, output);
+        expr.execute(translatorContext.getExecutionContext(), context, translatorContext.getDataContext(),
+            message.getSamzaSqlRelRecord().getFieldValues().toArray(), output);
       } catch (Exception e) {
         String errMsg = String.format("Handling the following rel message ran into an error. %s", message);
         LOG.error(errMsg, e);
         throw new SamzaException(errMsg, e);
       }
-      SamzaSqlRelRecord record = buildSamzaRelRecord(output, project.getRowType());
+      List<String> names = new ArrayList<>();
+      for (int index = 0; index < output.length; index++) {
+        names.add(index, project.getNamedProjects().get(index).getValue());
+      }
       updateMetrics(arrivalTime, System.nanoTime(), message.getSamzaSqlRelMsgMetadata().isNewInputMessage);
-      return new SamzaSqlRelMessage(record, message.getSamzaSqlRelMsgMetadata());
+      return new SamzaSqlRelMessage(names, Arrays.asList(output), message.getSamzaSqlRelMsgMetadata());
     }
 
     /**
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java
index 8a60502..6a93b2d 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java
@@ -21,8 +21,6 @@
 
 import java.util.List;
 import java.util.Objects;
-import java.util.function.Function;
-import javax.annotation.Nullable;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.samza.context.Context;
 import org.apache.samza.operators.KV;
@@ -44,35 +42,15 @@
   private transient SamzaRelTableKeyConverter relTableKeyConverter;
   private final String tableName;
   private final int queryId;
-  /**
-   * Projection and Filter Function to apply post the join lookup. Function will return null in case filter rejects row.
-   */
-  private Function<SamzaSqlRelMessage, SamzaSqlRelMessage> projectFunction;
-  /**
-   * Projects and Filters operator queue.
-   */
-  private final MessageStreamCollector messageStreamCollector;
-
-  public SamzaSqlRemoteTableJoinFunction(SamzaRelConverter msgConverter, SamzaRelTableKeyConverter tableKeyConverter,
-      JoinInputNode streamNode, JoinInputNode tableNode, JoinRelType joinRelType, int queryId,
-      MessageStreamCollector messageStreamCollector) {
-    super(streamNode, tableNode, joinRelType);
-    this.msgConverter = msgConverter;
-    this.relTableKeyConverter = tableKeyConverter;
-    this.tableName = tableNode.getSourceName();
-    this.queryId = queryId;
-    this.messageStreamCollector = messageStreamCollector;
-  }
 
   SamzaSqlRemoteTableJoinFunction(SamzaRelConverter msgConverter, SamzaRelTableKeyConverter tableKeyConverter,
       JoinInputNode streamNode, JoinInputNode tableNode, JoinRelType joinRelType, int queryId) {
     super(streamNode, tableNode, joinRelType);
+
     this.msgConverter = msgConverter;
     this.relTableKeyConverter = tableKeyConverter;
     this.tableName = tableNode.getSourceName();
     this.queryId = queryId;
-    this.projectFunction = Function.identity();
-    this.messageStreamCollector = null;
   }
 
   @Override
@@ -81,24 +59,13 @@
         ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
     this.msgConverter = translatorContext.getMsgConverter(tableName);
     this.relTableKeyConverter = translatorContext.getTableKeyConverter(tableName);
-    if (messageStreamCollector != null) {
-      projectFunction = messageStreamCollector.getFunction(context);
-    }
   }
 
-  /**
-   * Compute the projection and filter post join lookup.
-   *
-   * @param record input record as result of lookup
-   * @return the projected row or {@code null} if Row doesn't pass the filter condition.
-   */
   @Override
-  @Nullable
   protected List<Object> getTableRelRecordFieldValues(KV record) {
     // Using the message rel converter, convert message to sql rel message and add to output values.
-    final SamzaSqlRelMessage relMessage = msgConverter.convertToRelMessage(record);
-    final SamzaSqlRelMessage result = projectFunction.apply(relMessage);
-    return result == null ? null : result.getSamzaSqlRelRecord().getFieldValues();
+    SamzaSqlRelMessage relMessage = msgConverter.convertToRelMessage(record);
+    return relMessage.getSamzaSqlRelRecord().getFieldValues();
   }
 
   @Override
@@ -109,8 +76,6 @@
       return null;
     }
     // Using the table key converter, convert message key from rel format to the record key format.
-    // TODO: On way to avoid the object type here is to ensure that:
-    // table's key is a SamzaRelRecord or any well defined type when defining the table descriptor
     return relTableKeyConverter.convertToTableKeyFormat(keyRecord);
   }
 
@@ -118,12 +83,4 @@
   public Object getRecordKey(KV record) {
     return record.getKey();
   }
-
-  @Override
-  public void close() {
-    super.close();
-    if (messageStreamCollector != null) {
-      messageStreamCollector.close();
-    }
-  }
 }
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
index e8fa451..cd7ecdc 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
@@ -21,7 +21,6 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.commons.lang3.Validate;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
@@ -49,7 +48,6 @@
   // Table field names are used in the outer join when the table record is not found.
   private final ArrayList<String> tableFieldNames;
   private final ArrayList<String> outFieldNames;
-  final private List<Object> nullRow;
 
   SamzaSqlTableJoinFunction(JoinInputNode streamNode, JoinInputNode tableNode, JoinRelType joinRelType) {
     this.joinRelType = joinRelType;
@@ -71,7 +69,6 @@
       outFieldNames.addAll(tableFieldNames);
       outFieldNames.addAll(streamNode.getFieldNames());
     }
-    nullRow = tableFieldNames.stream().map(x -> null).collect(Collectors.toList());
   }
 
   @Override
@@ -96,10 +93,7 @@
 
     // Add the table record fields.
     if (record != null) {
-      List<Object> row = getTableRelRecordFieldValues(record);
-      // null in case the filter did not match thus row has to be removed if inner join or padded null case outer join
-      if (row == null && joinRelType.compareTo(JoinRelType.INNER) == 0) return null;
-      outFieldValues.addAll(row == null ? nullRow : row);
+      outFieldValues.addAll(getTableRelRecordFieldValues(record));
     } else {
       // Table record could be null as the record could not be found in the store. This can
       // happen for outer joins. Add nulls to all the field values in the output message.
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index 87c4e00..f58140d 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -170,10 +170,7 @@
     // SqlIOResolverFactory.
     // For local table, even though table descriptor is already defined, we still need to create the input stream
     // descriptor to load the local table.
-    // To handle case where a project or filter is pushed to Remote table Scan will collect the operators and feed it to the join operator.
-    // TODO In an ideal world this has to change and use Calcite Pattern matching to translate the plan.
     if (isRemoteTable) {
-      context.registerMessageStream(tableScan.getId(), new MessageStreamCollector());
       return;
     }
 
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
index 7d85652..5990897 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
@@ -34,7 +34,6 @@
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
 import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.sql.data.RexToJavaCompiler;
@@ -53,7 +52,7 @@
   private final RexToJavaCompiler compiler;
   private final Map<String, SamzaRelConverter> relSamzaConverters;
   private final Map<String, SamzaRelTableKeyConverter> relTableKeyConverters;
-  private final Map<Integer, MessageStream<SamzaSqlRelMessage>> messageStreams;
+  private final Map<Integer, MessageStream> messageStreams;
   private final Map<Integer, RelNode> relNodes;
   private final Map<String, DelegatingSystemDescriptor> systemDescriptors;
 
@@ -200,7 +199,7 @@
    * @param id the id
    * @return the message stream
    */
-  MessageStream<SamzaSqlRelMessage> getMessageStream(int id) {
+  MessageStream getMessageStream(int id) {
     return messageStreams.get(id);
   }
 
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/udf/GetNestedField.java b/samza-sql/src/main/java/org/apache/samza/sql/udf/GetNestedField.java
deleted file mode 100644
index 87cf486..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/udf/GetNestedField.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.udf;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import java.lang.reflect.Type;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.calcite.adapter.enumerable.CallImplementor;
-import org.apache.calcite.adapter.enumerable.EnumUtils;
-import org.apache.calcite.adapter.enumerable.NullPolicy;
-import org.apache.calcite.adapter.enumerable.RexImpTable;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.linq4j.tree.ConstantExpression;
-import org.apache.calcite.linq4j.tree.Expression;
-import org.apache.calcite.linq4j.tree.ExpressionType;
-import org.apache.calcite.linq4j.tree.Expressions;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.schema.Function;
-import org.apache.calcite.schema.FunctionParameter;
-import org.apache.calcite.schema.ImplementableFunction;
-import org.apache.calcite.schema.ScalarFunction;
-import org.apache.calcite.sql.SqlCallBinding;
-import org.apache.calcite.sql.SqlFunction;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOperandCountRange;
-import org.apache.calcite.sql.SqlOperatorBinding;
-import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.type.OperandTypes;
-import org.apache.calcite.sql.type.SqlOperandCountRanges;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
-
-import static org.apache.calcite.schema.impl.ReflectiveFunctionBase.builder;
-
-
-/**
- * Operator to extract nested Rows or Fields form a struct row type using a dotted path.
- * The goal of this operator is two-fold.
- * First it is a temporary fix for https://issues.apache.org/jira/browse/CALCITE-4065 to extract a row from a row.
- * Second it will enable smooth backward compatible migration from existing udf that relies on legacy row format.
- */
-public class GetNestedField extends SqlUserDefinedFunction {
-
-  public static final SqlFunction INSTANCE = new GetNestedField(new ExtractFunction());
-
-  public GetNestedField(Function function) {
-    super(new SqlIdentifier("GetNestedField", SqlParserPos.ZERO), null, null, null, ImmutableList.of(), function);
-  }
-
-  @Override
-  public SqlOperandCountRange getOperandCountRange() {
-    return SqlOperandCountRanges.of(2);
-  }
-
-  @Override
-  public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) {
-    final SqlNode left = callBinding.operand(0);
-    final SqlNode right = callBinding.operand(1);
-    final RelDataType type = callBinding.getValidator().deriveType(callBinding.getScope(), left);
-    boolean isRow = true;
-    if (type.getSqlTypeName() != SqlTypeName.ROW) {
-      isRow = false;
-    } else if (type.getSqlIdentifier().isStar()) {
-      isRow = false;
-    }
-    if (!isRow && throwOnFailure) {
-      throw callBinding.newValidationSignatureError();
-    }
-    return isRow && OperandTypes.STRING.checkSingleOperandType(callBinding, right, 0, throwOnFailure);
-  }
-
-  @Override
-  public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
-    final RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
-    final RelDataType recordType = opBinding.getOperandType(0);
-    switch (recordType.getSqlTypeName()) {
-      case ROW:
-        final String fieldName = opBinding.getOperandLiteralValue(1, String.class);
-        String[] fieldNameChain = fieldName.split("\\.");
-        RelDataType relDataType = opBinding.getOperandType(0);
-        for (int i = 0; i < fieldNameChain.length; i++) {
-          RelDataTypeField t = relDataType.getField(fieldNameChain[i], true, true);
-          Preconditions.checkNotNull(t,
-              "Can not find " + fieldNameChain[i] + " within record " + recordType.toString() + " Original String "
-                  + Arrays.toString(fieldNameChain) + " Original row " + recordType.toString());
-          relDataType = t.getType();
-        }
-        if (recordType.isNullable()) {
-          return typeFactory.createTypeWithNullability(relDataType, true);
-        } else {
-          return relDataType;
-        }
-      default:
-        throw new AssertionError("First Operand is suppose to be a Row Struct");
-    }
-  }
-
-  private static class ExtractFunction implements ScalarFunction, ImplementableFunction {
-    private final JavaTypeFactoryImpl javaTypeFactory = new JavaTypeFactoryImpl();
-
-    @Override
-    public CallImplementor getImplementor() {
-      return RexImpTable.createImplementor((translator, call, translatedOperands) -> {
-        Preconditions.checkState(translatedOperands.size() == 2 && call.operands.size() == 2,
-            "Expected 2 operands found " + Math.min(translatedOperands.size(), call.getOperands().size()));
-        Expression op0 = translatedOperands.get(0);
-        Expression op1 = translatedOperands.get(1);
-        Preconditions.checkState(op1.getNodeType().equals(ExpressionType.Constant),
-            "Operand 2 has to be constant and got " + op1.getNodeType());
-        Preconditions.checkState(op1.type.equals(String.class), "Operand 2 has to be String and got " + op1.type);
-        final String fieldName = (String) ((ConstantExpression) op1).value;
-        String[] fieldNameChain = fieldName.split("\\.");
-        RelDataType relDataType = call.operands.get(0).getType();
-        Preconditions.checkState(relDataType.getSqlTypeName().equals(SqlTypeName.ROW),
-            "Expected first operand to be ROW found " + relDataType.toString());
-        Expression currentExpression = op0;
-        for (int i = 0; i < fieldNameChain.length; i++) {
-          Preconditions.checkState(relDataType.getSqlTypeName() == SqlTypeName.ROW,
-              "Must be ROW found " + relDataType.toString());
-          RelDataTypeField t = relDataType.getField(fieldNameChain[i], true, true);
-          Preconditions.checkNotNull(t,
-              "Notfound " + fieldNameChain[i] + " in the following struct " + relDataType.toString()
-                  + " Original String " + Arrays.toString(fieldNameChain) + " Original row " + call.operands.get(0)
-                  .getType());
-          currentExpression = Expressions.arrayIndex(Expressions.convert_(currentExpression, Object[].class),
-              Expressions.constant(t.getIndex()));
-          relDataType = t.getType();
-        }
-        Type fieldType = javaTypeFactory.getJavaClass(relDataType);
-        return EnumUtils.convert(currentExpression, fieldType);
-      }, NullPolicy.ARG0, false);
-    }
-
-    @Override
-    public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
-      throw new IllegalStateException("should not be called");
-    }
-
-    @Override
-    public List<FunctionParameter> getParameters() {
-      return builder().add(Object[].class, "row").add(String.class, "path").build();
-    }
-  }
-
-  @Override
-  public String getAllowedSignatures(String opNameToUse) {
-    return opNameToUse + "(<ROW>, <VARCHAR>)";
-  }
-}
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestQueryPlanner.java b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestQueryPlanner.java
index d234067..2a9dc13 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestQueryPlanner.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestQueryPlanner.java
@@ -129,7 +129,7 @@
     LogicalJoin join = (LogicalJoin) relNode;
     RelNode left = join.getLeft();
     RelNode right = join.getRight();
-    assertTrue("Was instance of "  + right.getClass(), right instanceof LogicalProject);
+    assertTrue(right instanceof LogicalTableScan);
     if (enableOptimizer) {
       assertTrue(left instanceof LogicalFilter);
       assertEquals("=(1, $2)", ((LogicalFilter) left).getCondition().toString());
@@ -187,9 +187,9 @@
     relNode = relNode.getInput(0);
     assertTrue(relNode instanceof LogicalFilter);
     if (enableOptimizer) {
-      assertEquals("AND(=($2, $10), =($2, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
+      assertEquals("AND(=($2, $9), =($2, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
     } else {
-      assertEquals("AND(=(1, $11), =($2, $10), =($2, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
+      assertEquals("AND(=($2, $9), =(1, $10), =($2, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
     }
     relNode = relNode.getInput(0);
     if (enableOptimizer) {
@@ -202,7 +202,7 @@
     LogicalJoin join = (LogicalJoin) relNode;
     RelNode left = join.getLeft();
     RelNode right = join.getRight();
-    assertTrue("was instance of " + left.getClass(), left instanceof LogicalProject);
+    assertTrue(left instanceof LogicalTableScan);
     if (enableOptimizer) {
       assertTrue(right instanceof LogicalFilter);
       assertEquals("=(1, $2)", ((LogicalFilter) right).getCondition().toString());
@@ -289,14 +289,14 @@
     assertTrue(relNode instanceof LogicalProject);
     relNode = relNode.getInput(0);
     assertTrue(relNode instanceof LogicalFilter);
-    assertEquals("AND(=($2, $10), =($2, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
+    assertEquals("AND(=($2, $9), =($2, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
     relNode = relNode.getInput(0);
     assertTrue(relNode instanceof LogicalJoin);
     assertEquals(2, relNode.getInputs().size());
     LogicalJoin join = (LogicalJoin) relNode;
     RelNode left = join.getLeft();
     RelNode right = join.getRight();
-    assertTrue(left instanceof LogicalProject);
+    assertTrue(left instanceof LogicalTableScan);
     assertTrue(right instanceof LogicalFilter);
     assertEquals("=($2, CAST(MyTest($2)):INTEGER)", ((LogicalFilter) right).getCondition().toString());
     assertTrue(right.getInput(0) instanceof LogicalTableScan);
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index cc33e5f..47355be 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -258,9 +258,6 @@
       record.put("address", createProfileAddressRecord(index));
       record.put("companyId", includeNullForeignKeys && (index % 2 == 0) ? null : index % COMPANIES.length);
       record.put("phoneNumbers", createProfilePhoneNumbers(index % PHONE_NUMBERS.length));
-      Map<String, Object> mapValues = new HashMap<>();
-      mapValues.put("key", createSimpleRecord(index, false));
-      record.put("mapValues", mapValues);
       return record;
     }
 
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
index 33e775b..dc193a2 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
@@ -18,7 +18,6 @@
 */
 package org.apache.samza.sql.translator;
 
-import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -29,10 +28,7 @@
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.Pair;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
 import org.apache.samza.context.ContainerContext;
@@ -72,7 +68,6 @@
 @PrepareForTest(LogicalProject.class)
 public class TestProjectTranslator extends TranslatorTestBase {
   private static final String LOGICAL_OP_ID = "sql0_project_0";
-  private static final String TEST_FIELD = "test_field";
 
   @Test
   public void testTranslate() throws IOException, ClassNotFoundException {
@@ -91,23 +86,12 @@
     when(mockProject.getInputs()).thenReturn(inputs);
     when(mockProject.getInput()).thenReturn(mockInput);
     RelDataType mockRowType = mock(RelDataType.class);
-    List<RelDataTypeField> relFields = new ArrayList<>();
-    String fieldName = TEST_FIELD;
-    int fieldPos = 0;
-    RelDataType dataType = mock(RelDataType.class);
-    when(dataType.getSqlTypeName()).thenReturn(SqlTypeName.ANY);
-    relFields.add(new RelDataTypeFieldImpl(fieldName, fieldPos, dataType));
     when(mockRowType.getFieldCount()).thenReturn(1);
     when(mockProject.getRowType()).thenReturn(mockRowType);
-    when(mockProject.getRowType().getSqlTypeName()).thenReturn(SqlTypeName.ROW);
-    when(mockProject.getRowType().getFieldList()).thenReturn(relFields);
-    when(mockProject.getRowType().isStruct()).thenReturn(true);
     RexNode mockRexField = mock(RexNode.class);
     List<Pair<RexNode, String>> namedProjects = new ArrayList<>();
-    namedProjects.add(Pair.of(mockRexField, TEST_FIELD));
+    namedProjects.add(Pair.of(mockRexField, "test_field"));
     when(mockProject.getNamedProjects()).thenReturn(namedProjects);
-    when(mockProject.getRowType()).thenReturn(mockRowType);
-    when(mockProject.getRowType().getFieldNames()).thenReturn(ImmutableList.of(TEST_FIELD));
     StreamApplicationDescriptorImpl mockAppDesc = mock(StreamApplicationDescriptorImpl.class);
     OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = mock(OperatorSpec.class);
     MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockAppDesc, mockInputOp);
@@ -168,7 +152,7 @@
     }).when(mockExpr).execute(eq(executionContext), eq(mockContext), eq(dataContext),
         eq(mockInputMsg.getSamzaSqlRelRecord().getFieldValues().toArray()), eq(result));
     SamzaSqlRelMessage retMsg = (SamzaSqlRelMessage) mapFn.apply(mockInputMsg);
-    assertEquals(retMsg.getSamzaSqlRelRecord().getFieldNames(), Collections.singletonList(TEST_FIELD));
+    assertEquals(retMsg.getSamzaSqlRelRecord().getFieldNames(), Collections.singletonList("test_field"));
     assertEquals(retMsg.getSamzaSqlRelRecord().getFieldValues(), Collections.singletonList(mockFieldObj));
 
     // Verify mapFn.apply() updates the TestMetricsRegistryImpl metrics
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index 931036e..07c4ce2 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -313,6 +313,30 @@
   }
 
   @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableJoinWithoutJoinOperator() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p"
+            + " where p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
+
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
+
+    translator.translate(queryInfo.get(0), streamAppDesc, 0);
+  }
+
+  @Test (expected = SamzaException.class)
   public void testTranslateStreamTableJoinWithFullJoinOperator() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
@@ -632,9 +656,9 @@
 
     Assert.assertEquals(3, specGraph.getOutputStreams().size());
     Assert.assertEquals("kafka", output1System);
-    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql_0_join_3", output1PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql_0_join_2", output1PhysicalName);
     Assert.assertEquals("kafka", output2System);
-    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql_0_join_3", output2PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql_0_join_2", output2PhysicalName);
     Assert.assertEquals("testavro", output3System);
     Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
 
@@ -644,9 +668,9 @@
     Assert.assertEquals("testavro", input2System);
     Assert.assertEquals("PROFILE", input2PhysicalName);
     Assert.assertEquals("kafka", input3System);
-    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql_0_join_3", input3PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql_0_join_2", input3PhysicalName);
     Assert.assertEquals("kafka", input4System);
-    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql_0_join_3", input4PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql_0_join_2", input4PhysicalName);
   }
 
   @Test
@@ -700,9 +724,9 @@
 
     Assert.assertEquals(3, specGraph.getOutputStreams().size());
     Assert.assertEquals("kafka", output1System);
-    Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_3", output1PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_2", output1PhysicalName);
     Assert.assertEquals("kafka", output2System);
-    Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_3", output2PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_2", output2PhysicalName);
     Assert.assertEquals("testavro", output3System);
     Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
 
@@ -712,9 +736,9 @@
     Assert.assertEquals("testavro", input2System);
     Assert.assertEquals("PROFILE", input2PhysicalName);
     Assert.assertEquals("kafka", input3System);
-    Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_3", input3PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_2", input3PhysicalName);
     Assert.assertEquals("kafka", input4System);
-    Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_3", input4PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_2", input4PhysicalName);
   }
 
   @Test
@@ -767,9 +791,9 @@
 
     Assert.assertEquals(3, specGraph.getOutputStreams().size());
     Assert.assertEquals("kafka", output1System);
-    Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_3", output1PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_2", output1PhysicalName);
     Assert.assertEquals("kafka", output2System);
-    Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_3", output2PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_2", output2PhysicalName);
     Assert.assertEquals("testavro", output3System);
     Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
 
@@ -779,9 +803,9 @@
     Assert.assertEquals("testavro", input2System);
     Assert.assertEquals("PAGEVIEW", input2PhysicalName);
     Assert.assertEquals("kafka", input3System);
-    Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_3", input3PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_2", input3PhysicalName);
     Assert.assertEquals("kafka", input4System);
-    Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_3", input4PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_2", input4PhysicalName);
   }
 
   @Test
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 4a515c0..fab06af 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -515,6 +515,7 @@
     Assert.assertEquals(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()), outMessages);
   }
 
+  @Ignore
   @Test
   public void testEndToEndNestedRecord() throws SamzaSqlValidatorException {
     int numMessages = 10;
@@ -522,12 +523,9 @@
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
 
     String sql1 =
-        "Insert into testavro.outputTopic (id, bool_value)"
-            // SQL array is one indexed.
-            + " select `phoneNumbers`[1].`kind` as string_value, p.address.streetnum.number as id, "
-            + " `phoneNumbers`[1].`kind` = 'Home' as bool_value, cast(p.address.zip as bigint) as long_value"
-            + " from testavro.PROFILE as p where p.address.zip > 0 and p.address.zip < 100003 ";
-
+        "Insert into testavro.outputTopic"
+            + " select `phoneNumbers`[0].`kind`"
+            + " from testavro.PROFILE as p";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
 
@@ -541,72 +539,6 @@
     Assert.assertEquals(numMessages, outMessages.size());
   }
 
-  /**
-   * Testing the getNestedField built in operator
-   * @throws SamzaSqlValidatorException
-   */
-  @Test
-  public void testEndToEndGetNestedFieldOperator() throws SamzaSqlValidatorException {
-    int numMessages = 10;
-    TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-    String sql1 =
-        "Insert into testavro.outputTopic (string_value, id, bool_value, double_value, map_values, long_value)"
-            + " select GetNestedField(address, 'streetnum.number') * getNestedField(mapValues['key'], 'id') as id, "
-            + " cast(GetNestedField(address, 'streetnum').number * 1.0 as double) as double_value, mapValues as map_values, "
-            + " GetNestedField(phoneNumbers[1] ,'kind') = 'Home' as bool_value, cast( mapValues['key'].id as bigint) as long_value , "
-            + " GetNestedField(mapValues['key'], 'name') as string_value "
-            + " from testavro.PROFILE as p  where GetNestedField(address, 'zip') > 0 and GetNestedField(address, 'zip') < 100003";
-    List<String> sqlStmts = Collections.singletonList(sql1);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-
-    Config config = new MapConfig(staticConfigs);
-    new SamzaSqlValidator(config).validate(sqlStmts);
-    runApplication(config);
-
-    List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);
-    // check that the projected values are not null, correct types and good values when easy to check.
-    List<GenericRecord> actualResult = outMessages.stream()
-        .map(x -> (GenericRecord) x.getMessage())
-        .filter(x -> (Boolean) x.get("bool_value"))
-        .filter(x -> x.get("string_value") != null && !x.get("string_value").toString().isEmpty())
-        .filter(x -> x.get("map_values") instanceof Map)
-        .filter(x -> x.get("id") instanceof Integer)
-        .filter(x -> (Long) x.get("long_value") < 10 && (Long) x.get("long_value") >= 0)
-        .filter(x -> x.get("double_value") instanceof Double && (Double) x.get("double_value") >= 1234.0)
-        .collect(Collectors.toList());
-    Assert.assertEquals(
-        "Wrong results size, check the test condition against the Actual outputs -> " + outMessages.toString(),
-        numMessages, actualResult.size());
-  }
-
-
-  @Test
-  public void testEndToEndNestedRecordProjectFilter() throws SamzaSqlValidatorException {
-    int numMessages = 10;
-    TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-
-    String sql1 = " Insert into testavro.PROFILE1 select (p.address.streetnum.number * p.address.zip) as id , "
-        + " p.address, `phoneNumbers`[1].`kind` = 'Home' as selfEmployed, "
-        + " MAP[cast(id as varchar), `phoneNumbers`[1].number] as mapValues, phoneNumbers, "
-        + " cast(companyId as varchar) || name ||`phoneNumbers`[1].number || 'concat' as name , "
-        + " 100 * ((companyId + 122) / 3 ) as companyId "
-        + " from testavro.PROFILE as p where p.address.zip > 0 "
-        + " and p.address.zip < 100003 and p.address.streetnum.number > 0 ";
-
-    List<String> sqlStmts = Collections.singletonList(sql1);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-
-    Config config = new MapConfig(staticConfigs);
-    new SamzaSqlValidator(config).validate(sqlStmts);
-
-    runApplication(config);
-
-    List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);
-    Assert.assertEquals(numMessages, outMessages.size());
-  }
-
   @Test
   public void testEndToEndFlattenWithUdf() throws Exception {
     int numMessages = 20;
@@ -769,7 +701,7 @@
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql1 = "Insert into testavro.PROFILE1(id, address) "
-        + "select id, BuildOutputRecord('key', p.address.zip) as address from testavro.PROFILE as p";
+        + "select id, BuildOutputRecord('key', GetNestedField(address, 'zip')) as address from testavro.PROFILE";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
     runApplication(new MapConfig(staticConfigs));
@@ -846,7 +778,7 @@
             + "       p.name as profileName, p.address as profileAddress "
             + "from testavro.PROFILE.`$table` as p "
             + "join testavro.PAGEVIEW as pv "
-            + " on p.id = pv.profileId where p.name = 'Mike' or p.name is not null";
+            + " on p.id = pv.profileId";
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -867,7 +799,7 @@
   }
 
   @Test
-  public void testEndToEndStreamTableInnerJoinWithPrimaryKey() {
+  public void testEndToEndStreamTableInnerJoinWithPrimaryKey() throws Exception {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
index bd541c6..c41038d 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
@@ -138,7 +138,7 @@
             + "       p.name as profileName, p.address as profileAddress "
             + "from testRemoteStore.Profile.`$table` as p "
             + "join testavro.PAGEVIEW as pv "
-            + " on p.__key__ = pv.profileId where p.name is not null";
+            + " on p.__key__ = pv.profileId";
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -336,7 +336,7 @@
             + "       p.name as profileName, p.address as profileAddress "
             + "from testRemoteStore.Profile.`$table` as p "
             + "right join testavro.PAGEVIEW as pv "
-            + " on p.__key__ = pv.profileId where p.name is null or  p.name <> '0'";
+            + " on p.__key__ = pv.profileId";
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -400,82 +400,6 @@
 
 
   @Test
-  public void testSourceEndToEndWithFilterAndInnerJoin() throws SamzaSqlValidatorException {
-    int numMessages = 20;
-    TestAvroSystemFactory.messages.clear();
-    RemoteStoreIOResolverTestFactory.records.clear();
-    Map<String, String> staticConfigs =
-        SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), numMessages, true);
-    populateProfileTable(staticConfigs, numMessages);
-
-    String sql = "Insert into testavro.enrichedPageViewTopic "
-        + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
-        + "       p.name as profileName, p.address as profileAddress "
-        + "from testavro.PAGEVIEW as pv  "
-        + "join testRemoteStore.Profile.`$table` as p "
-        + " on p.__key__ = pv.profileId"
-        + "  where p.name <> 'Mike' ";
-
-    List<String> sqlStmts = Arrays.asList(sql);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-
-    Config config = new MapConfig(staticConfigs);
-    new SamzaSqlValidator(config).validate(sqlStmts);
-
-    runApplication(config);
-
-    List<String> outMessages = TestAvroSystemFactory.messages.stream()
-        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," + (
-            ((GenericRecord) x.getMessage()).get("profileName") == null ? "null"
-                : ((GenericRecord) x.getMessage()).get("profileName").toString()))
-        .collect(Collectors.toList());
-    List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameJoinWithNullForeignKeys(numMessages)
-        .stream()
-        .filter(x -> !x.contains("Mike"))
-        .collect(Collectors.toList());
-    Assert.assertEquals(expectedOutMessages, outMessages);
-  }
-
-  @Test
-  public void testSourceEndToEndWithFilterAndLeftOuterJoin() throws SamzaSqlValidatorException {
-    int numMessages = 20;
-    TestAvroSystemFactory.messages.clear();
-    RemoteStoreIOResolverTestFactory.records.clear();
-    Map<String, String> staticConfigs =
-        SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), numMessages, true);
-    populateProfileTable(staticConfigs, numMessages);
-
-    String sql = "Insert into testavro.enrichedPageViewTopic "
-        + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
-        + "       p.name as profileName, p.address as profileAddress "
-        + "from testavro.PAGEVIEW as pv  "
-        + " LEFT Join testRemoteStore.Profile.`$table` as p "
-        + " on  pv.profileId + 1 - (2/2) = p.__key__ "
-        + "  where p.name <> 'Mary' or p.name is null";
-
-    List<String> sqlStmts = Arrays.asList(sql);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-
-    Config config = new MapConfig(staticConfigs);
-    new SamzaSqlValidator(config).validate(sqlStmts);
-
-    runApplication(config);
-
-    List<String> outMessages = TestAvroSystemFactory.messages.stream()
-        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," + (
-            ((GenericRecord) x.getMessage()).get("profileName") == null ? "null"
-                : ((GenericRecord) x.getMessage()).get("profileName").toString()))
-        .collect(Collectors.toList());
-    List<String> expectedOutMessages =
-        TestAvroSystemFactory.getPageKeyProfileNameOuterJoinWithNullForeignKeys(numMessages)
-            .stream()
-            .filter(x -> !x.contains("Mary"))
-            .collect(Collectors.toList());
-
-    Assert.assertEquals(expectedOutMessages, outMessages);
-  }
-
-  @Test
   public void testSameJoinTargetSinkEndToEndRightOuterJoin() throws SamzaSqlValidatorException {
     int numMessages = 21;