Samza-2631: Samza-sql: Revert structured data support (#1472)
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index c4138ea..b7cee00 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -201,12 +201,10 @@
.collect(Collectors.toList());
return avroList;
case MAP:
- // If you ask why not using String and that is because some strings are Wrapped into org.apache.avro.util.Utf8
- // TODO looking at the Utf8 code base it is not immutable, having it as a key is calling for trouble!
- final Map<Object, Object> outputMap = new HashMap<>();
- ((Map<Object, Object>) relObj).forEach((key, aValue) -> outputMap.put(key,
- convertToAvroObject(aValue, getNonNullUnionSchema(schema).getValueType())));
- return outputMap;
+ return ((Map<String, ?>) relObj).entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ e -> convertToAvroObject(e.getValue(), getNonNullUnionSchema(schema).getValueType())));
case UNION:
for (Schema unionSchema : schema.getTypes()) {
if (isSchemaCompatibleWithRelObj(relObj, unionSchema)) {
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
index d837e03..5dddf15 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
@@ -148,10 +148,8 @@
}
private SqlFieldSchema getSqlTypeFromUnionTypes(List<Schema> types, boolean isNullable, boolean isOptional) {
- if (types.size() == 1) {
- return convertField(types.get(0), true, true);
- } else if (types.size() == 2) {
- // Typically a nullable field's schema is configured as an union of Null and a Type.
+ // Typically a nullable field's schema is configured as an union of Null and a Type.
+ if (types.size() == 2) {
if (types.get(0).getType() == Schema.Type.NULL) {
return convertField(types.get(1), true, true);
} else if (types.get(1).getType() == Schema.Type.NULL) {
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/fn/GetNestedFieldUdf.java b/samza-sql/src/main/java/org/apache/samza/sql/fn/GetNestedFieldUdf.java
new file mode 100644
index 0000000..4ef2a11
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/fn/GetNestedFieldUdf.java
@@ -0,0 +1,42 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.fn;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+import org.apache.samza.sql.schema.SamzaSqlFieldType;
+import org.apache.samza.sql.udfs.SamzaSqlUdf;
+import org.apache.samza.sql.udfs.SamzaSqlUdfMethod;
+import org.apache.samza.sql.udfs.ScalarUdf;
+
+
+@SamzaSqlUdf(name = "GetNestedField", description = "UDF that extracts a field value from a nested SamzaSqlRelRecord")
+public class GetNestedFieldUdf implements ScalarUdf {
+ @Override
+ public void init(Config udfConfig, Context context) {
+ }
+
+ @SamzaSqlUdfMethod(params = {SamzaSqlFieldType.ANY, SamzaSqlFieldType.STRING},
+ returns = SamzaSqlFieldType.ANY)
+ public Object execute(Object currentFieldOrValue, String fieldName) {
+ GetSqlFieldUdf udf = new GetSqlFieldUdf();
+ return udf.getSqlField(currentFieldOrValue, fieldName);
+ }
+}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java
index 60794ef..ccbee6a 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java
@@ -98,11 +98,11 @@
if (parsedSqlArgType.getSqlTypeName() == SqlTypeName.CHAR && udfArgumentAsSqlType == SqlTypeName.VARCHAR) {
return true;
} else if (!Objects.equals(parsedSqlArgType.getSqlTypeName(), udfArgumentAsSqlType)
- && !ANY_SQL_TYPE_NAMES.contains(parsedSqlArgType.getSqlTypeName()) && hasOneUdfMethod(udfMetadata)) {
+ && !ANY_SQL_TYPE_NAMES.contains(parsedSqlArgType.getSqlTypeName()) && hasOneUdfMethod(udfMetadata)) {
// 3(b). Throw up and fail on mismatch between the SamzaSqlType and CalciteType for any argument.
- String msg = String.format(
- "Type mismatch in udf class: %s at argument index: %d." + "Expected type: %s, actual type: %s.",
- udfMetadata.getName(), udfArgumentIndex, udfArgumentAsSqlType, parsedSqlArgType.getSqlTypeName());
+ String msg = String.format("Type mismatch in udf class: %s at argument index: %d." +
+ "Expected type: %s, actual type: %s.", udfMetadata.getName(),
+ udfArgumentIndex, parsedSqlArgType.getSqlTypeName(), udfArgumentAsSqlType);
LOG.error(msg);
throw new SamzaSqlValidatorException(msg);
}
@@ -159,9 +159,8 @@
static SqlTypeName toCalciteSqlType(SamzaSqlFieldType samzaSqlFieldType) {
switch (samzaSqlFieldType) {
case ANY:
- return SqlTypeName.ANY;
case ROW:
- return SqlTypeName.ROW;
+ return SqlTypeName.ANY;
case MAP:
return SqlTypeName.MAP;
case ARRAY:
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index dc37753..767df43 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -20,13 +20,15 @@
package org.apache.samza.sql.planner;
import com.google.common.collect.ImmutableList;
+import java.sql.Connection;
+import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.calcite.config.Lex;
-import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptUtil;
@@ -60,10 +62,10 @@
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.RelSchemaProvider;
import org.apache.samza.sql.interfaces.SqlIOConfig;
-import org.apache.samza.sql.interfaces.UdfMetadata;
import org.apache.samza.sql.schema.SamzaSqlFieldType;
import org.apache.samza.sql.schema.SqlFieldSchema;
import org.apache.samza.sql.schema.SqlSchema;
+import org.apache.samza.sql.interfaces.UdfMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -120,42 +122,56 @@
}
private Planner getPlanner() {
- Planner planner;
- SchemaPlus rootSchema = CalciteSchema.createRootSchema(true, false).plus();
- registerSourceSchemas(rootSchema);
+ Planner planner = null;
+ try {
+ Connection connection = DriverManager.getConnection("jdbc:calcite:");
+ CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
+ SchemaPlus rootSchema = calciteConnection.getRootSchema();
+ registerSourceSchemas(rootSchema);
- List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions =
- udfMetadata.stream().map(SamzaSqlScalarFunctionImpl::new).collect(Collectors.toList());
+ List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions = udfMetadata.stream()
+ .map(SamzaSqlScalarFunctionImpl::new)
+ .collect(Collectors.toList());
- final List<RelTraitDef> traitDefs = new ArrayList<>();
+ final List<RelTraitDef> traitDefs = new ArrayList<>();
- traitDefs.add(ConventionTraitDef.INSTANCE);
- traitDefs.add(RelCollationTraitDef.INSTANCE);
+ traitDefs.add(ConventionTraitDef.INSTANCE);
+ traitDefs.add(RelCollationTraitDef.INSTANCE);
- List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
- sqlOperatorTables.add(new SamzaSqlOperatorTable());
- sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));
+ List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+ sqlOperatorTables.add(new SamzaSqlOperatorTable());
+ sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));
- // TODO: Introduce a pluggable rule factory.
- List<RelOptRule> rules = ImmutableList.of(FilterProjectTransposeRule.INSTANCE, ProjectMergeRule.INSTANCE,
- new SamzaSqlFilterRemoteJoinRule.SamzaSqlFilterIntoRemoteJoinRule(true, RelFactories.LOGICAL_BUILDER,
- systemStreamConfigBySource));
+ // TODO: Introduce a pluggable rule factory.
+ List<RelOptRule> rules = ImmutableList.of(
+ FilterProjectTransposeRule.INSTANCE,
+ ProjectMergeRule.INSTANCE,
+ new SamzaSqlFilterRemoteJoinRule.SamzaSqlFilterIntoRemoteJoinRule(true, RelFactories.LOGICAL_BUILDER,
+ systemStreamConfigBySource));
- // Using lenient so that !=,%,- are allowed.
- FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
- .parserConfig(SqlParser.configBuilder()
- .setLex(Lex.JAVA)
- .setConformance(SqlConformanceEnum.LENIENT)
- .setCaseSensitive(false) // Make Udfs case insensitive
- .build())
- .defaultSchema(rootSchema)
- .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
- .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
- .traitDefs(traitDefs)
- .programs(Programs.hep(rules, true, DefaultRelMetadataProvider.INSTANCE))
- .build();
- planner = Frameworks.getPlanner(frameworkConfig);
- return planner;
+ // Using lenient so that !=,%,- are allowed.
+ FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
+ .parserConfig(SqlParser.configBuilder()
+ .setLex(Lex.JAVA)
+ .setConformance(SqlConformanceEnum.LENIENT)
+ .setCaseSensitive(false) // Make Udfs case insensitive
+ .build())
+ .defaultSchema(rootSchema)
+ .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
+ .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
+ .traitDefs(traitDefs)
+ .programs(Programs.hep(rules, true, DefaultRelMetadataProvider.INSTANCE))
+ .build();
+ planner = Frameworks.getPlanner(frameworkConfig);
+ return planner;
+ } catch (Exception e) {
+ String errorMsg = "Failed to create planner.";
+ LOG.error(errorMsg, e);
+ if (planner != null) {
+ planner.close();
+ }
+ throw new SamzaException(errorMsg, e);
+ }
}
private RelRoot optimize(Planner planner, RelRoot relRoot) {
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
index fcc289c..c3735a4 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
@@ -21,13 +21,11 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rel.type.RelRecordType;
-import org.apache.calcite.rel.type.StructKind;
import org.apache.calcite.sql.type.ArraySqlType;
import org.apache.calcite.sql.type.MapSqlType;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
@@ -96,11 +94,6 @@
case INT64:
return createTypeWithNullability(createSqlType(SqlTypeName.BIGINT), true);
case ROW:
- final RelDataType rowType = convertToRelSchema(fieldSchema.getRowSchema());
- /* Using Fully Qualified names to ensure that at the last project the row is fully reconstructed */
- return createTypeWithNullability(createStructType(StructKind.FULLY_QUALIFIED,
- rowType.getFieldList().stream().map(RelDataTypeField::getType).collect(Collectors.toList()),
- rowType.getFieldNames()), true);
case ANY:
// TODO Calcite execution engine doesn't support record type yet.
return createTypeWithNullability(createSqlType(SqlTypeName.ANY), true);
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
index 6b8e8ba..3098af7 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
@@ -35,7 +35,6 @@
import org.apache.calcite.sql.fun.SqlRowOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.util.ReflectiveSqlOperatorTable;
-import org.apache.samza.sql.udf.GetNestedField;
/**
@@ -144,7 +143,6 @@
public static final SqlFunction TUMBLE = SqlStdOperatorTable.TUMBLE;
public static final SqlFunction TUMBLE_END = SqlStdOperatorTable.TUMBLE_END;
public static final SqlFunction TUMBLE_START = SqlStdOperatorTable.TUMBLE_START;
- public static final SqlFunction GET_NESTED_FIELD_OP = GetNestedField.INSTANCE;
public SamzaSqlOperatorTable() {
init();
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index b1b9001..86af594 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -61,7 +61,6 @@
* NOTE: This constructor is called from {@link ApplicationRunners} through reflection.
* Please refrain from updating the signature or removing this constructor unless the caller has changed the interface.
*/
- @SuppressWarnings("unused") /* used via reflection */
public SamzaSqlApplicationRunner(SamzaApplication app, Config config) {
this(app, false, config);
}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
index b42569c..604f061 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
@@ -80,7 +80,6 @@
this.context = context;
this.translatorContext = ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
this.filter = (LogicalFilter) this.translatorContext.getRelNode(filterId);
- LOG.info("Compiling Operator {}", filter.getDigest());
this.expr = this.translatorContext.getExpressionCompiler().compile(filter.getInputs(), Collections.singletonList(filter.getCondition()));
ContainerContext containerContext = context.getContainerContext();
metricsRegistry = containerContext.getContainerMetricsRegistry();
@@ -97,10 +96,9 @@
public boolean apply(SamzaSqlRelMessage message) {
long startProcessing = System.nanoTime();
Object[] result = new Object[1];
- Object[] inputRow = ProjectTranslator.convertToJavaRow(message.getSamzaSqlRelRecord());
try {
expr.execute(translatorContext.getExecutionContext(), context, translatorContext.getDataContext(),
- inputRow, result);
+ message.getSamzaSqlRelRecord().getFieldValues().toArray(), result);
} catch (Exception e) {
String errMsg = String.format("Handling the following rel message ran into an error. %s", message);
LOG.error(errMsg, e);
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
index c51ff25..1a0a7ec 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
@@ -74,7 +74,7 @@
}
String getSourceName() {
- return SqlIOConfig.getSourceFromSourceParts(getTableScan(relNode).getTable().getQualifiedName());
+ return SqlIOConfig.getSourceFromSourceParts(relNode.getTable().getQualifiedName());
}
boolean isPosOnRight() {
@@ -109,12 +109,4 @@
}
}
- private static TableScan getTableScan(RelNode relNode) {
- if (relNode instanceof TableScan) {
- return (TableScan) relNode;
- }
- // we deal with Single inputs filter/project
- assert relNode.getInputs().size() == 1;
- return getTableScan(relNode.getInput(0));
- }
}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 26d9fa0..02c434d 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -40,7 +40,6 @@
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexShuttle;
-import org.apache.calcite.rex.RexSlot;
import org.apache.calcite.sql.SqlExplainFormat;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlKind;
@@ -80,7 +79,7 @@
class JoinTranslator {
private static final Logger log = LoggerFactory.getLogger(JoinTranslator.class);
- private final String logicalOpId;
+ private String logicalOpId;
private final String intermediateStreamPrefix;
private final int queryId;
private final TranslatorInputMetricsMapFunction inputMetricsMF;
@@ -166,16 +165,8 @@
if (tableNode.isRemoteTable()) {
String remoteTableName = tableNode.getSourceName();
- MessageStream<SamzaSqlRelMessage> operatorStack = context.getMessageStream(tableNode.getRelNode().getId());
- final StreamTableJoinFunction<Object, SamzaSqlRelMessage, KV, SamzaSqlRelMessage> joinFn;
- if (operatorStack != null && operatorStack instanceof MessageStreamCollector) {
- joinFn = new SamzaSqlRemoteTableJoinFunction(context.getMsgConverter(remoteTableName),
- context.getTableKeyConverter(remoteTableName), streamNode, tableNode, join.getJoinType(), queryId,
- (MessageStreamCollector) operatorStack);
- } else {
- joinFn = new SamzaSqlRemoteTableJoinFunction(context.getMsgConverter(remoteTableName),
- context.getTableKeyConverter(remoteTableName), streamNode, tableNode, join.getJoinType(), queryId);
- }
+ StreamTableJoinFunction joinFn = new SamzaSqlRemoteTableJoinFunction(context.getMsgConverter(remoteTableName),
+ context.getTableKeyConverter(remoteTableName), streamNode, tableNode, join.getJoinType(), queryId);
return inputStream
.map(inputMetricsMF)
@@ -184,11 +175,7 @@
// Join with the local table
- StreamTableJoinFunction<SamzaSqlRelRecord,
- SamzaSqlRelMessage,
- KV<SamzaSqlRelRecord, SamzaSqlRelMessage>,
- SamzaSqlRelMessage>
- joinFn = new SamzaSqlLocalTableJoinFunction(streamNode, tableNode, join.getJoinType());
+ StreamTableJoinFunction joinFn = new SamzaSqlLocalTableJoinFunction(streamNode, tableNode, join.getJoinType());
SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
(SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
@@ -283,8 +270,8 @@
isTablePosOnRight ? join.getRowType().getFieldCount() : join.getLeft().getRowType().getFieldCount();
List<Integer> tableRefsIdx = refCollector.stream()
- .map(RexSlot::getIndex)
- .filter(x -> (tableStartIndex <= x) && (x < tableEndIndex)) // collect all the refs form table side
+ .map(x -> x.getIndex())
+ .filter(x -> tableStartIndex <= x && x < tableEndIndex) // collect all the refs form table side
.map(x -> x - tableStartIndex) // re-adjust the offset
.sorted()
.collect(Collectors.toList()); // we have a list with all the input from table side with 0 based index.
@@ -357,12 +344,11 @@
private void validateJoinKeyType(RexInputRef ref) {
SqlTypeName sqlTypeName = ref.getType().getSqlTypeName();
- // Primitive types and Row (for the record key) and ANY for other fields like __key__
- // TODO this need to be pulled to a common class/place that have all the supported types
+ // Primitive types and ANY (for the record key) are supported in the key
if (sqlTypeName != SqlTypeName.BOOLEAN && sqlTypeName != SqlTypeName.TINYINT && sqlTypeName != SqlTypeName.SMALLINT
&& sqlTypeName != SqlTypeName.INTEGER && sqlTypeName != SqlTypeName.CHAR && sqlTypeName != SqlTypeName.BIGINT
&& sqlTypeName != SqlTypeName.VARCHAR && sqlTypeName != SqlTypeName.DOUBLE && sqlTypeName != SqlTypeName.FLOAT
- && sqlTypeName != SqlTypeName.ANY && sqlTypeName != SqlTypeName.OTHER && sqlTypeName != SqlTypeName.ROW) {
+ && sqlTypeName != SqlTypeName.ANY && sqlTypeName != SqlTypeName.OTHER) {
log.error("Unsupported key type " + sqlTypeName + " used in join condition.");
throw new SamzaException("Unsupported key type used in join condition.");
}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/MessageStreamCollector.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/MessageStreamCollector.java
deleted file mode 100644
index 0ddc136..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/MessageStreamCollector.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.translator;
-
-import java.io.Closeable;
-import java.io.Serializable;
-import java.time.Duration;
-import java.util.ArrayDeque;
-import java.util.Collection;
-import java.util.Deque;
-import java.util.function.Function;
-import org.apache.samza.context.Context;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.functions.AsyncFlatMapFunction;
-import org.apache.samza.operators.functions.FilterFunction;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.functions.StreamTableJoinFunction;
-import org.apache.samza.operators.windows.Window;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.apache.samza.table.Table;
-
-
-/**
- * Collector of Map and Filter Samza Functions to collect call stack on the top of Remote table.
- * This Collector will be used by Join operator and trigger it when applying the join function post lookup.
- *
- * Note that this is needed because the Remote Table can not expose a proper {@code MessageStream}.
- * It is a work around to minimize the amount of code changes of the current Query Translator {@link org.apache.samza.sql.translator.QueryTranslator},
- * But in an ideal world, we should use Calcite planner in conventional way we can combine function when via translation of RelNodes.
- */
-class MessageStreamCollector implements MessageStream<SamzaSqlRelMessage>, Serializable, Closeable {
-
- /**
- * Queue First in First to be Fired order of the operators on the top of Remote Table Scan.
- */
- private final Deque<MapFunction<? super SamzaSqlRelMessage, ? extends SamzaSqlRelMessage>> mapFnCallQueue =
- new ArrayDeque<>();
-
- /**
- * Function to chain the call to close from each operator.
- */
- private transient Function<Void, Void> closeFn = aVoid -> null;
-
- @Override
- public <OM> MessageStream<OM> map(MapFunction<? super SamzaSqlRelMessage, ? extends OM> mapFn) {
- mapFnCallQueue.offer((MapFunction<? super SamzaSqlRelMessage, ? extends SamzaSqlRelMessage>) mapFn);
- return (MessageStream<OM>) this;
- }
-
- @Override
- public MessageStream<SamzaSqlRelMessage> filter(FilterFunction<? super SamzaSqlRelMessage> filterFn) {
- mapFnCallQueue.offer(new FilterMapAdapter(filterFn));
- return this;
- }
-
- /**
- * This function is called by the join operator on run time to apply filter and projects post join lookup.
- *
- * @param context Samza Execution Context
- * @return {code null} case filter reject the row, Samza Relational Record as it goes via Projects.
- */
- Function<SamzaSqlRelMessage, SamzaSqlRelMessage> getFunction(Context context) {
- Function<SamzaSqlRelMessage, SamzaSqlRelMessage> tailFn = null;
- Function<Void, Void> intFn = aVoid -> null; // Projects and Filters both need to be initialized.
- closeFn = aVoid -> null;
- // At this point we have a the queue of operator, where first in is the first operator on top of TableScan.
- while (!mapFnCallQueue.isEmpty()) {
- MapFunction<? super SamzaSqlRelMessage, ? extends SamzaSqlRelMessage> f = mapFnCallQueue.poll();
- intFn = intFn.andThen((aVoid) -> {
- f.init(context);
- return null;
- });
- closeFn.andThen((aVoid) -> {
- f.close();
- return null;
- });
-
- Function<SamzaSqlRelMessage, SamzaSqlRelMessage> current = x -> x == null ? null : f.apply(x);
- if (tailFn == null) {
- tailFn = current;
- } else {
- tailFn = current.compose(tailFn);
- }
- }
- // TODO TBH not sure about this need to check if Samza Framework will be okay with late init call.
- intFn.apply(null); // Init call has to happen here.
- return tailFn == null ? Function.identity() : tailFn;
- }
-
- /**
- * Filter adapter is used to compose filters with {@code MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage>}
- * Filter function will return {@code null} when input is {@code null} or filter condition reject current row.
- */
- private static class FilterMapAdapter implements MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage> {
- private final FilterFunction<? super SamzaSqlRelMessage> filterFn;
-
- private FilterMapAdapter(FilterFunction<? super SamzaSqlRelMessage> filterFn) {
- this.filterFn = filterFn;
- }
-
- @Override
- public SamzaSqlRelMessage apply(SamzaSqlRelMessage message) {
- if (message != null && filterFn.apply(message)) {
- return message;
- }
- // null on case no match
- return null;
- }
-
- @Override
- public void close() {
- filterFn.close();
- }
-
- @Override
- public void init(Context context) {
- filterFn.init(context);
- }
- }
-
- @Override
- public void close() {
- if (closeFn != null) {
- closeFn.apply(null);
- }
- }
-
- @Override
- public <OM> MessageStream<OM> flatMap(FlatMapFunction<? super SamzaSqlRelMessage, ? extends OM> flatMapFn) {
- return null;
- }
-
- @Override
- public <OM> MessageStream<OM> flatMapAsync(
- AsyncFlatMapFunction<? super SamzaSqlRelMessage, ? extends OM> asyncFlatMapFn) {
- return null;
- }
-
- @Override
- public void sink(SinkFunction<? super SamzaSqlRelMessage> sinkFn) {
- throw new IllegalStateException("Not valid state");
- }
-
- @Override
- public MessageStream<SamzaSqlRelMessage> sendTo(OutputStream<SamzaSqlRelMessage> outputStream) {
- throw new IllegalStateException("Not valid state");
- }
-
- @Override
- public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<SamzaSqlRelMessage, K, WV> window, String id) {
- throw new IllegalStateException("Not valid state");
- }
-
- @Override
- public <K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream,
- JoinFunction<? extends K, ? super SamzaSqlRelMessage, ? super OM, ? extends JM> joinFn, Serde<K> keySerde,
- Serde<SamzaSqlRelMessage> messageSerde, Serde<OM> otherMessageSerde, Duration ttl, String id) {
- throw new IllegalStateException("Not valid state");
- }
-
- @Override
- public <K, R extends KV, JM> MessageStream<JM> join(Table<R> table,
- StreamTableJoinFunction<? extends K, ? super SamzaSqlRelMessage, ? super R, ? extends JM> joinFn,
- Object... args) {
- throw new IllegalStateException("Not valid state");
- }
-
- @Override
- public MessageStream<SamzaSqlRelMessage> merge(
- Collection<? extends MessageStream<? extends SamzaSqlRelMessage>> otherStreams) {
- throw new IllegalStateException("Not valid state");
- }
-
- @Override
- public <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super SamzaSqlRelMessage, ? extends K> keyExtractor,
- MapFunction<? super SamzaSqlRelMessage, ? extends V> valueExtractor, KVSerde<K, V> serde, String id) {
- throw new IllegalStateException("Not valid state");
- }
-
- @Override
- public <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table, Object... args) {
- throw new IllegalStateException("Not valid state");
- }
-
- @Override
- public MessageStream<SamzaSqlRelMessage> broadcast(Serde<SamzaSqlRelMessage> serde, String id) {
- throw new IllegalStateException("Not valid state");
- }
-}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index 79f58e8..1ebea5f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -19,14 +19,11 @@
package org.apache.samza.sql.translator;
-import com.google.common.base.Preconditions;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import org.apache.calcite.DataContext;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
@@ -40,9 +37,7 @@
import org.apache.samza.metrics.SamzaHistogram;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.sql.SamzaSqlRelRecord;
import org.apache.samza.sql.data.Expression;
-import org.apache.samza.sql.data.SamzaSqlExecutionContext;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
@@ -54,7 +49,7 @@
* Translator to translate the Project node in the relational graph to the corresponding StreamGraph
* implementation.
*/
-public class ProjectTranslator {
+class ProjectTranslator {
private static final Logger LOG = LoggerFactory.getLogger(ProjectTranslator.class);
//private transient int messageIndex = 0;
@@ -65,122 +60,6 @@
}
/**
- * Converts the resulting row from Calcite Expression Evaluator to SamzaRelRecord to be sent downstream.
- *
- * @param objects input objects to be converted
- * @param rowType Calcite row type of the resulting row
- * @return return a valid message Stream of type SamzaSqlRelRecord
- */
- public static SamzaSqlRelRecord buildSamzaRelRecord(Object[] objects, RelDataType rowType) {
- Preconditions.checkNotNull(objects, "Input objects can not be null");
- Preconditions.checkState(rowType.isStruct(), "Row Type has to be a Struct and got " + rowType.getSqlTypeName());
- Preconditions.checkState(objects.length == rowType.getFieldCount(),
- "Objects counts and type counts must match " + objects.length + " vs " + rowType.getFieldCount());
- List<String> names = new ArrayList<>(rowType.getFieldNames());
- List<Object> values = new ArrayList<>(rowType.getFieldCount());
- for (int i = 0; i < objects.length; i++) {
- Object val = objects[i];
- if (val == null) {
- values.add(null);
- continue;
- }
- final RelDataType valueType = rowType.getFieldList().get(i).getType();
- values.add(convertToSamzaSqlType(val, valueType));
- }
- return new SamzaSqlRelRecord(names, values);
- }
-
- /**
- * Recursively converts a Primitive Java Object to valid Samza Rel Record field type.
- *
- * @param value input value to be converted
- * @param dataType value type as derived by Calcite
- * @return SamzaRelRecord or primitive SamzaRelRecord field.
- *
- */
- private static Object convertToSamzaSqlType(Object value, RelDataType dataType) {
- if (value == null) {
- return null;
- }
- switch (dataType.getSqlTypeName()) {
- case ROW:
- List<String> names = new ArrayList<>(dataType.getFieldNames());
- // Row Struct is represent as Object array in Calcite.
- Object[] row = (Object[]) value;
- List<Object> values = new ArrayList<>(row.length);
- for (int i = 0; i < row.length; i++) {
- values.add(convertToSamzaSqlType(row[i], dataType.getFieldList().get(i).getType()));
- }
- return new SamzaSqlRelRecord(names, values);
- case MAP:
- Map<Object, Object> objectMap = (Map<Object, Object>) value;
- Map<Object, Object> resultMap = new HashMap<>();
- final RelDataType valuesType = dataType.getValueType();
- objectMap.forEach((key, v) -> resultMap.put(key, convertToSamzaSqlType(v, valuesType)));
- return resultMap;
- case ARRAY:
- List<Object> objectList = (List<Object>) value;
- final RelDataType elementsType = dataType.getComponentType();
- return objectList.stream().map(e -> convertToSamzaSqlType(e, elementsType)).collect(Collectors.toList());
- case BOOLEAN:
- case BIGINT:
- case BINARY:
- case INTEGER:
- case TINYINT:
- case DOUBLE:
- case FLOAT:
- case REAL:
- case VARCHAR:
- case CHAR:
- case VARBINARY:
- case ANY:
- case OTHER:
- // today we treat everything else as Type Any or Other, this is not ideal.
- // this will change when adding timestamps support or more complex non java primitive types.
- // TODO in a better world we need to add type factory that can do the conversion between calcite and samza.
- return value;
- default:
- // As of today we treat everything else as type ANY
- throw new IllegalStateException("Unknown SQL type " + dataType.getSqlTypeName());
- }
- }
-
- /**
- * Converts the Samza Record to a Java Primitive Row format that's in convention with Calcite Enum operators.
- *
- * @param samzaSqlRelRecord input record.
- * @return row of Java Primitive conform to org.apache.calcite.adapter.enumerable.JavaRowFormat#ARRAY
- */
- public static Object[] convertToJavaRow(SamzaSqlRelRecord samzaSqlRelRecord) {
- if (samzaSqlRelRecord == null) {
- return null;
- }
- Object[] inputRow = new Object[samzaSqlRelRecord.getFieldValues().size()];
- for (int i = 0; i < inputRow.length; i++) {
- inputRow[i] = asPrimitiveJavaRow(samzaSqlRelRecord.getFieldValues().get(i));
- }
- return inputRow;
- }
-
- private static Object asPrimitiveJavaRow(Object inputObject) {
- if (inputObject == null) {
- return null;
- }
- if (inputObject instanceof SamzaSqlRelRecord) {
- return convertToJavaRow((SamzaSqlRelRecord) inputObject);
- }
- if (inputObject instanceof List) {
- return ((List) inputObject).stream().map(e -> asPrimitiveJavaRow(e)).collect(Collectors.toList());
- }
- if (inputObject instanceof Map) {
- Map<Object, Object> objectMap = new HashMap<>();
- ((Map<Object, Object>) inputObject).forEach((k, v) -> objectMap.put(k, asPrimitiveJavaRow(v)));
- return objectMap;
- }
- return inputObject;
- }
-
- /**
* ProjectMapFunction implements MapFunction to map input SamzaSqlRelMessages, one at a time, to a new
* SamzaSqlRelMessage which consists of the projected fields
*/
@@ -214,7 +93,6 @@
this.translatorContext =
((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
this.project = (Project) this.translatorContext.getRelNode(projectId);
- LOG.info("Compiling operator {} ", project.getDigest());
this.expr = this.translatorContext.getExpressionCompiler().compile(project.getInputs(), project.getProjects());
ContainerContext containerContext = context.getContainerContext();
metricsRegistry = containerContext.getContainerMetricsRegistry();
@@ -235,19 +113,20 @@
long arrivalTime = System.nanoTime();
RelDataType type = project.getRowType();
Object[] output = new Object[type.getFieldCount()];
- Object[] inputRow = convertToJavaRow(message.getSamzaSqlRelRecord());
- SamzaSqlExecutionContext execContext = translatorContext.getExecutionContext();
- DataContext dataRootContext = translatorContext.getDataContext();
try {
- expr.execute(execContext, context, dataRootContext, inputRow, output);
+ expr.execute(translatorContext.getExecutionContext(), context, translatorContext.getDataContext(),
+ message.getSamzaSqlRelRecord().getFieldValues().toArray(), output);
} catch (Exception e) {
String errMsg = String.format("Handling the following rel message ran into an error. %s", message);
LOG.error(errMsg, e);
throw new SamzaException(errMsg, e);
}
- SamzaSqlRelRecord record = buildSamzaRelRecord(output, project.getRowType());
+ List<String> names = new ArrayList<>();
+ for (int index = 0; index < output.length; index++) {
+ names.add(index, project.getNamedProjects().get(index).getValue());
+ }
updateMetrics(arrivalTime, System.nanoTime(), message.getSamzaSqlRelMsgMetadata().isNewInputMessage);
- return new SamzaSqlRelMessage(record, message.getSamzaSqlRelMsgMetadata());
+ return new SamzaSqlRelMessage(names, Arrays.asList(output), message.getSamzaSqlRelMsgMetadata());
}
/**
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java
index 8a60502..6a93b2d 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java
@@ -21,8 +21,6 @@
import java.util.List;
import java.util.Objects;
-import java.util.function.Function;
-import javax.annotation.Nullable;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.samza.context.Context;
import org.apache.samza.operators.KV;
@@ -44,35 +42,15 @@
private transient SamzaRelTableKeyConverter relTableKeyConverter;
private final String tableName;
private final int queryId;
- /**
- * Projection and Filter Function to apply post the join lookup. Function will return null in case filter rejects row.
- */
- private Function<SamzaSqlRelMessage, SamzaSqlRelMessage> projectFunction;
- /**
- * Projects and Filters operator queue.
- */
- private final MessageStreamCollector messageStreamCollector;
-
- public SamzaSqlRemoteTableJoinFunction(SamzaRelConverter msgConverter, SamzaRelTableKeyConverter tableKeyConverter,
- JoinInputNode streamNode, JoinInputNode tableNode, JoinRelType joinRelType, int queryId,
- MessageStreamCollector messageStreamCollector) {
- super(streamNode, tableNode, joinRelType);
- this.msgConverter = msgConverter;
- this.relTableKeyConverter = tableKeyConverter;
- this.tableName = tableNode.getSourceName();
- this.queryId = queryId;
- this.messageStreamCollector = messageStreamCollector;
- }
SamzaSqlRemoteTableJoinFunction(SamzaRelConverter msgConverter, SamzaRelTableKeyConverter tableKeyConverter,
JoinInputNode streamNode, JoinInputNode tableNode, JoinRelType joinRelType, int queryId) {
super(streamNode, tableNode, joinRelType);
+
this.msgConverter = msgConverter;
this.relTableKeyConverter = tableKeyConverter;
this.tableName = tableNode.getSourceName();
this.queryId = queryId;
- this.projectFunction = Function.identity();
- this.messageStreamCollector = null;
}
@Override
@@ -81,24 +59,13 @@
((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
this.msgConverter = translatorContext.getMsgConverter(tableName);
this.relTableKeyConverter = translatorContext.getTableKeyConverter(tableName);
- if (messageStreamCollector != null) {
- projectFunction = messageStreamCollector.getFunction(context);
- }
}
- /**
- * Compute the projection and filter post join lookup.
- *
- * @param record input record as result of lookup
- * @return the projected row or {@code null} if Row doesn't pass the filter condition.
- */
@Override
- @Nullable
protected List<Object> getTableRelRecordFieldValues(KV record) {
// Using the message rel converter, convert message to sql rel message and add to output values.
- final SamzaSqlRelMessage relMessage = msgConverter.convertToRelMessage(record);
- final SamzaSqlRelMessage result = projectFunction.apply(relMessage);
- return result == null ? null : result.getSamzaSqlRelRecord().getFieldValues();
+ SamzaSqlRelMessage relMessage = msgConverter.convertToRelMessage(record);
+ return relMessage.getSamzaSqlRelRecord().getFieldValues();
}
@Override
@@ -109,8 +76,6 @@
return null;
}
// Using the table key converter, convert message key from rel format to the record key format.
- // TODO: On way to avoid the object type here is to ensure that:
- // table's key is a SamzaRelRecord or any well defined type when defining the table descriptor
return relTableKeyConverter.convertToTableKeyFormat(keyRecord);
}
@@ -118,12 +83,4 @@
public Object getRecordKey(KV record) {
return record.getKey();
}
-
- @Override
- public void close() {
- super.close();
- if (messageStreamCollector != null) {
- messageStreamCollector.close();
- }
- }
}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
index e8fa451..cd7ecdc 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
@@ -21,7 +21,6 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.commons.lang3.Validate;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
@@ -49,7 +48,6 @@
// Table field names are used in the outer join when the table record is not found.
private final ArrayList<String> tableFieldNames;
private final ArrayList<String> outFieldNames;
- final private List<Object> nullRow;
SamzaSqlTableJoinFunction(JoinInputNode streamNode, JoinInputNode tableNode, JoinRelType joinRelType) {
this.joinRelType = joinRelType;
@@ -71,7 +69,6 @@
outFieldNames.addAll(tableFieldNames);
outFieldNames.addAll(streamNode.getFieldNames());
}
- nullRow = tableFieldNames.stream().map(x -> null).collect(Collectors.toList());
}
@Override
@@ -96,10 +93,7 @@
// Add the table record fields.
if (record != null) {
- List<Object> row = getTableRelRecordFieldValues(record);
- // null in case the filter did not match thus row has to be removed if inner join or padded null case outer join
- if (row == null && joinRelType.compareTo(JoinRelType.INNER) == 0) return null;
- outFieldValues.addAll(row == null ? nullRow : row);
+ outFieldValues.addAll(getTableRelRecordFieldValues(record));
} else {
// Table record could be null as the record could not be found in the store. This can
// happen for outer joins. Add nulls to all the field values in the output message.
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index 87c4e00..f58140d 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -170,10 +170,7 @@
// SqlIOResolverFactory.
// For local table, even though table descriptor is already defined, we still need to create the input stream
// descriptor to load the local table.
- // To handle case where a project or filter is pushed to Remote table Scan will collect the operators and feed it to the join operator.
- // TODO In an ideal world this has to change and use Calcite Pattern matching to translate the plan.
if (isRemoteTable) {
- context.registerMessageStream(tableScan.getId(), new MessageStreamCollector());
return;
}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
index 7d85652..5990897 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
@@ -34,7 +34,6 @@
import org.apache.calcite.schema.SchemaPlus;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.operators.MessageStream;
-import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
import org.apache.samza.sql.data.RexToJavaCompiler;
@@ -53,7 +52,7 @@
private final RexToJavaCompiler compiler;
private final Map<String, SamzaRelConverter> relSamzaConverters;
private final Map<String, SamzaRelTableKeyConverter> relTableKeyConverters;
- private final Map<Integer, MessageStream<SamzaSqlRelMessage>> messageStreams;
+ private final Map<Integer, MessageStream> messageStreams;
private final Map<Integer, RelNode> relNodes;
private final Map<String, DelegatingSystemDescriptor> systemDescriptors;
@@ -200,7 +199,7 @@
* @param id the id
* @return the message stream
*/
- MessageStream<SamzaSqlRelMessage> getMessageStream(int id) {
+ MessageStream getMessageStream(int id) {
return messageStreams.get(id);
}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/udf/GetNestedField.java b/samza-sql/src/main/java/org/apache/samza/sql/udf/GetNestedField.java
deleted file mode 100644
index 87cf486..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/udf/GetNestedField.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.udf;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import java.lang.reflect.Type;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.calcite.adapter.enumerable.CallImplementor;
-import org.apache.calcite.adapter.enumerable.EnumUtils;
-import org.apache.calcite.adapter.enumerable.NullPolicy;
-import org.apache.calcite.adapter.enumerable.RexImpTable;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.linq4j.tree.ConstantExpression;
-import org.apache.calcite.linq4j.tree.Expression;
-import org.apache.calcite.linq4j.tree.ExpressionType;
-import org.apache.calcite.linq4j.tree.Expressions;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.schema.Function;
-import org.apache.calcite.schema.FunctionParameter;
-import org.apache.calcite.schema.ImplementableFunction;
-import org.apache.calcite.schema.ScalarFunction;
-import org.apache.calcite.sql.SqlCallBinding;
-import org.apache.calcite.sql.SqlFunction;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOperandCountRange;
-import org.apache.calcite.sql.SqlOperatorBinding;
-import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.type.OperandTypes;
-import org.apache.calcite.sql.type.SqlOperandCountRanges;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
-
-import static org.apache.calcite.schema.impl.ReflectiveFunctionBase.builder;
-
-
-/**
- * Operator to extract nested Rows or Fields form a struct row type using a dotted path.
- * The goal of this operator is two-fold.
- * First it is a temporary fix for https://issues.apache.org/jira/browse/CALCITE-4065 to extract a row from a row.
- * Second it will enable smooth backward compatible migration from existing udf that relies on legacy row format.
- */
-public class GetNestedField extends SqlUserDefinedFunction {
-
- public static final SqlFunction INSTANCE = new GetNestedField(new ExtractFunction());
-
- public GetNestedField(Function function) {
- super(new SqlIdentifier("GetNestedField", SqlParserPos.ZERO), null, null, null, ImmutableList.of(), function);
- }
-
- @Override
- public SqlOperandCountRange getOperandCountRange() {
- return SqlOperandCountRanges.of(2);
- }
-
- @Override
- public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) {
- final SqlNode left = callBinding.operand(0);
- final SqlNode right = callBinding.operand(1);
- final RelDataType type = callBinding.getValidator().deriveType(callBinding.getScope(), left);
- boolean isRow = true;
- if (type.getSqlTypeName() != SqlTypeName.ROW) {
- isRow = false;
- } else if (type.getSqlIdentifier().isStar()) {
- isRow = false;
- }
- if (!isRow && throwOnFailure) {
- throw callBinding.newValidationSignatureError();
- }
- return isRow && OperandTypes.STRING.checkSingleOperandType(callBinding, right, 0, throwOnFailure);
- }
-
- @Override
- public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
- final RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
- final RelDataType recordType = opBinding.getOperandType(0);
- switch (recordType.getSqlTypeName()) {
- case ROW:
- final String fieldName = opBinding.getOperandLiteralValue(1, String.class);
- String[] fieldNameChain = fieldName.split("\\.");
- RelDataType relDataType = opBinding.getOperandType(0);
- for (int i = 0; i < fieldNameChain.length; i++) {
- RelDataTypeField t = relDataType.getField(fieldNameChain[i], true, true);
- Preconditions.checkNotNull(t,
- "Can not find " + fieldNameChain[i] + " within record " + recordType.toString() + " Original String "
- + Arrays.toString(fieldNameChain) + " Original row " + recordType.toString());
- relDataType = t.getType();
- }
- if (recordType.isNullable()) {
- return typeFactory.createTypeWithNullability(relDataType, true);
- } else {
- return relDataType;
- }
- default:
- throw new AssertionError("First Operand is suppose to be a Row Struct");
- }
- }
-
- private static class ExtractFunction implements ScalarFunction, ImplementableFunction {
- private final JavaTypeFactoryImpl javaTypeFactory = new JavaTypeFactoryImpl();
-
- @Override
- public CallImplementor getImplementor() {
- return RexImpTable.createImplementor((translator, call, translatedOperands) -> {
- Preconditions.checkState(translatedOperands.size() == 2 && call.operands.size() == 2,
- "Expected 2 operands found " + Math.min(translatedOperands.size(), call.getOperands().size()));
- Expression op0 = translatedOperands.get(0);
- Expression op1 = translatedOperands.get(1);
- Preconditions.checkState(op1.getNodeType().equals(ExpressionType.Constant),
- "Operand 2 has to be constant and got " + op1.getNodeType());
- Preconditions.checkState(op1.type.equals(String.class), "Operand 2 has to be String and got " + op1.type);
- final String fieldName = (String) ((ConstantExpression) op1).value;
- String[] fieldNameChain = fieldName.split("\\.");
- RelDataType relDataType = call.operands.get(0).getType();
- Preconditions.checkState(relDataType.getSqlTypeName().equals(SqlTypeName.ROW),
- "Expected first operand to be ROW found " + relDataType.toString());
- Expression currentExpression = op0;
- for (int i = 0; i < fieldNameChain.length; i++) {
- Preconditions.checkState(relDataType.getSqlTypeName() == SqlTypeName.ROW,
- "Must be ROW found " + relDataType.toString());
- RelDataTypeField t = relDataType.getField(fieldNameChain[i], true, true);
- Preconditions.checkNotNull(t,
- "Notfound " + fieldNameChain[i] + " in the following struct " + relDataType.toString()
- + " Original String " + Arrays.toString(fieldNameChain) + " Original row " + call.operands.get(0)
- .getType());
- currentExpression = Expressions.arrayIndex(Expressions.convert_(currentExpression, Object[].class),
- Expressions.constant(t.getIndex()));
- relDataType = t.getType();
- }
- Type fieldType = javaTypeFactory.getJavaClass(relDataType);
- return EnumUtils.convert(currentExpression, fieldType);
- }, NullPolicy.ARG0, false);
- }
-
- @Override
- public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
- throw new IllegalStateException("should not be called");
- }
-
- @Override
- public List<FunctionParameter> getParameters() {
- return builder().add(Object[].class, "row").add(String.class, "path").build();
- }
- }
-
- @Override
- public String getAllowedSignatures(String opNameToUse) {
- return opNameToUse + "(<ROW>, <VARCHAR>)";
- }
-}
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestQueryPlanner.java b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestQueryPlanner.java
index d234067..2a9dc13 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestQueryPlanner.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestQueryPlanner.java
@@ -129,7 +129,7 @@
LogicalJoin join = (LogicalJoin) relNode;
RelNode left = join.getLeft();
RelNode right = join.getRight();
- assertTrue("Was instance of " + right.getClass(), right instanceof LogicalProject);
+ assertTrue(right instanceof LogicalTableScan);
if (enableOptimizer) {
assertTrue(left instanceof LogicalFilter);
assertEquals("=(1, $2)", ((LogicalFilter) left).getCondition().toString());
@@ -187,9 +187,9 @@
relNode = relNode.getInput(0);
assertTrue(relNode instanceof LogicalFilter);
if (enableOptimizer) {
- assertEquals("AND(=($2, $10), =($2, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
+ assertEquals("AND(=($2, $9), =($2, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
} else {
- assertEquals("AND(=(1, $11), =($2, $10), =($2, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
+ assertEquals("AND(=($2, $9), =(1, $10), =($2, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
}
relNode = relNode.getInput(0);
if (enableOptimizer) {
@@ -202,7 +202,7 @@
LogicalJoin join = (LogicalJoin) relNode;
RelNode left = join.getLeft();
RelNode right = join.getRight();
- assertTrue("was instance of " + left.getClass(), left instanceof LogicalProject);
+ assertTrue(left instanceof LogicalTableScan);
if (enableOptimizer) {
assertTrue(right instanceof LogicalFilter);
assertEquals("=(1, $2)", ((LogicalFilter) right).getCondition().toString());
@@ -289,14 +289,14 @@
assertTrue(relNode instanceof LogicalProject);
relNode = relNode.getInput(0);
assertTrue(relNode instanceof LogicalFilter);
- assertEquals("AND(=($2, $10), =($2, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
+ assertEquals("AND(=($2, $9), =($2, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
relNode = relNode.getInput(0);
assertTrue(relNode instanceof LogicalJoin);
assertEquals(2, relNode.getInputs().size());
LogicalJoin join = (LogicalJoin) relNode;
RelNode left = join.getLeft();
RelNode right = join.getRight();
- assertTrue(left instanceof LogicalProject);
+ assertTrue(left instanceof LogicalTableScan);
assertTrue(right instanceof LogicalFilter);
assertEquals("=($2, CAST(MyTest($2)):INTEGER)", ((LogicalFilter) right).getCondition().toString());
assertTrue(right.getInput(0) instanceof LogicalTableScan);
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index cc33e5f..47355be 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -258,9 +258,6 @@
record.put("address", createProfileAddressRecord(index));
record.put("companyId", includeNullForeignKeys && (index % 2 == 0) ? null : index % COMPANIES.length);
record.put("phoneNumbers", createProfilePhoneNumbers(index % PHONE_NUMBERS.length));
- Map<String, Object> mapValues = new HashMap<>();
- mapValues.put("key", createSimpleRecord(index, false));
- record.put("mapValues", mapValues);
return record;
}
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
index 33e775b..dc193a2 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
@@ -18,7 +18,6 @@
*/
package org.apache.samza.sql.translator;
-import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -29,10 +28,7 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Pair;
import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
import org.apache.samza.context.ContainerContext;
@@ -72,7 +68,6 @@
@PrepareForTest(LogicalProject.class)
public class TestProjectTranslator extends TranslatorTestBase {
private static final String LOGICAL_OP_ID = "sql0_project_0";
- private static final String TEST_FIELD = "test_field";
@Test
public void testTranslate() throws IOException, ClassNotFoundException {
@@ -91,23 +86,12 @@
when(mockProject.getInputs()).thenReturn(inputs);
when(mockProject.getInput()).thenReturn(mockInput);
RelDataType mockRowType = mock(RelDataType.class);
- List<RelDataTypeField> relFields = new ArrayList<>();
- String fieldName = TEST_FIELD;
- int fieldPos = 0;
- RelDataType dataType = mock(RelDataType.class);
- when(dataType.getSqlTypeName()).thenReturn(SqlTypeName.ANY);
- relFields.add(new RelDataTypeFieldImpl(fieldName, fieldPos, dataType));
when(mockRowType.getFieldCount()).thenReturn(1);
when(mockProject.getRowType()).thenReturn(mockRowType);
- when(mockProject.getRowType().getSqlTypeName()).thenReturn(SqlTypeName.ROW);
- when(mockProject.getRowType().getFieldList()).thenReturn(relFields);
- when(mockProject.getRowType().isStruct()).thenReturn(true);
RexNode mockRexField = mock(RexNode.class);
List<Pair<RexNode, String>> namedProjects = new ArrayList<>();
- namedProjects.add(Pair.of(mockRexField, TEST_FIELD));
+ namedProjects.add(Pair.of(mockRexField, "test_field"));
when(mockProject.getNamedProjects()).thenReturn(namedProjects);
- when(mockProject.getRowType()).thenReturn(mockRowType);
- when(mockProject.getRowType().getFieldNames()).thenReturn(ImmutableList.of(TEST_FIELD));
StreamApplicationDescriptorImpl mockAppDesc = mock(StreamApplicationDescriptorImpl.class);
OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = mock(OperatorSpec.class);
MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockAppDesc, mockInputOp);
@@ -168,7 +152,7 @@
}).when(mockExpr).execute(eq(executionContext), eq(mockContext), eq(dataContext),
eq(mockInputMsg.getSamzaSqlRelRecord().getFieldValues().toArray()), eq(result));
SamzaSqlRelMessage retMsg = (SamzaSqlRelMessage) mapFn.apply(mockInputMsg);
- assertEquals(retMsg.getSamzaSqlRelRecord().getFieldNames(), Collections.singletonList(TEST_FIELD));
+ assertEquals(retMsg.getSamzaSqlRelRecord().getFieldNames(), Collections.singletonList("test_field"));
assertEquals(retMsg.getSamzaSqlRelRecord().getFieldValues(), Collections.singletonList(mockFieldObj));
// Verify mapFn.apply() updates the TestMetricsRegistryImpl metrics
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index 931036e..07c4ce2 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -313,6 +313,30 @@
}
@Test (expected = SamzaException.class)
+ public void testTranslateStreamTableJoinWithoutJoinOperator() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
+ + " select p.name as profileName, pv.pageKey"
+ + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p"
+ + " where p.id = pv.profileId";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+
+ List<String> sqlStmts = fetchSqlFromConfig(config);
+ List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
+
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+ QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
+
+ translator.translate(queryInfo.get(0), streamAppDesc, 0);
+ }
+
+ @Test (expected = SamzaException.class)
public void testTranslateStreamTableJoinWithFullJoinOperator() {
Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
String sql =
@@ -632,9 +656,9 @@
Assert.assertEquals(3, specGraph.getOutputStreams().size());
Assert.assertEquals("kafka", output1System);
- Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql_0_join_3", output1PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql_0_join_2", output1PhysicalName);
Assert.assertEquals("kafka", output2System);
- Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql_0_join_3", output2PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql_0_join_2", output2PhysicalName);
Assert.assertEquals("testavro", output3System);
Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
@@ -644,9 +668,9 @@
Assert.assertEquals("testavro", input2System);
Assert.assertEquals("PROFILE", input2PhysicalName);
Assert.assertEquals("kafka", input3System);
- Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql_0_join_3", input3PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql_0_join_2", input3PhysicalName);
Assert.assertEquals("kafka", input4System);
- Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql_0_join_3", input4PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql_0_join_2", input4PhysicalName);
}
@Test
@@ -700,9 +724,9 @@
Assert.assertEquals(3, specGraph.getOutputStreams().size());
Assert.assertEquals("kafka", output1System);
- Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_3", output1PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_2", output1PhysicalName);
Assert.assertEquals("kafka", output2System);
- Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_3", output2PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_2", output2PhysicalName);
Assert.assertEquals("testavro", output3System);
Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
@@ -712,9 +736,9 @@
Assert.assertEquals("testavro", input2System);
Assert.assertEquals("PROFILE", input2PhysicalName);
Assert.assertEquals("kafka", input3System);
- Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_3", input3PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_2", input3PhysicalName);
Assert.assertEquals("kafka", input4System);
- Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_3", input4PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_2", input4PhysicalName);
}
@Test
@@ -767,9 +791,9 @@
Assert.assertEquals(3, specGraph.getOutputStreams().size());
Assert.assertEquals("kafka", output1System);
- Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_3", output1PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_2", output1PhysicalName);
Assert.assertEquals("kafka", output2System);
- Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_3", output2PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_2", output2PhysicalName);
Assert.assertEquals("testavro", output3System);
Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
@@ -779,9 +803,9 @@
Assert.assertEquals("testavro", input2System);
Assert.assertEquals("PAGEVIEW", input2PhysicalName);
Assert.assertEquals("kafka", input3System);
- Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_3", input3PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_2", input3PhysicalName);
Assert.assertEquals("kafka", input4System);
- Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_3", input4PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_2", input4PhysicalName);
}
@Test
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 4a515c0..fab06af 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -515,6 +515,7 @@
Assert.assertEquals(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()), outMessages);
}
+ @Ignore
@Test
public void testEndToEndNestedRecord() throws SamzaSqlValidatorException {
int numMessages = 10;
@@ -522,12 +523,9 @@
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 =
- "Insert into testavro.outputTopic (id, bool_value)"
- // SQL array is one indexed.
- + " select `phoneNumbers`[1].`kind` as string_value, p.address.streetnum.number as id, "
- + " `phoneNumbers`[1].`kind` = 'Home' as bool_value, cast(p.address.zip as bigint) as long_value"
- + " from testavro.PROFILE as p where p.address.zip > 0 and p.address.zip < 100003 ";
-
+ "Insert into testavro.outputTopic"
+ + " select `phoneNumbers`[0].`kind`"
+ + " from testavro.PROFILE as p";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -541,72 +539,6 @@
Assert.assertEquals(numMessages, outMessages.size());
}
- /**
- * Testing the getNestedField built in operator
- * @throws SamzaSqlValidatorException
- */
- @Test
- public void testEndToEndGetNestedFieldOperator() throws SamzaSqlValidatorException {
- int numMessages = 10;
- TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
- String sql1 =
- "Insert into testavro.outputTopic (string_value, id, bool_value, double_value, map_values, long_value)"
- + " select GetNestedField(address, 'streetnum.number') * getNestedField(mapValues['key'], 'id') as id, "
- + " cast(GetNestedField(address, 'streetnum').number * 1.0 as double) as double_value, mapValues as map_values, "
- + " GetNestedField(phoneNumbers[1] ,'kind') = 'Home' as bool_value, cast( mapValues['key'].id as bigint) as long_value , "
- + " GetNestedField(mapValues['key'], 'name') as string_value "
- + " from testavro.PROFILE as p where GetNestedField(address, 'zip') > 0 and GetNestedField(address, 'zip') < 100003";
- List<String> sqlStmts = Collections.singletonList(sql1);
- staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-
- Config config = new MapConfig(staticConfigs);
- new SamzaSqlValidator(config).validate(sqlStmts);
- runApplication(config);
-
- List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);
- // check that the projected values are not null, correct types and good values when easy to check.
- List<GenericRecord> actualResult = outMessages.stream()
- .map(x -> (GenericRecord) x.getMessage())
- .filter(x -> (Boolean) x.get("bool_value"))
- .filter(x -> x.get("string_value") != null && !x.get("string_value").toString().isEmpty())
- .filter(x -> x.get("map_values") instanceof Map)
- .filter(x -> x.get("id") instanceof Integer)
- .filter(x -> (Long) x.get("long_value") < 10 && (Long) x.get("long_value") >= 0)
- .filter(x -> x.get("double_value") instanceof Double && (Double) x.get("double_value") >= 1234.0)
- .collect(Collectors.toList());
- Assert.assertEquals(
- "Wrong results size, check the test condition against the Actual outputs -> " + outMessages.toString(),
- numMessages, actualResult.size());
- }
-
-
- @Test
- public void testEndToEndNestedRecordProjectFilter() throws SamzaSqlValidatorException {
- int numMessages = 10;
- TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-
- String sql1 = " Insert into testavro.PROFILE1 select (p.address.streetnum.number * p.address.zip) as id , "
- + " p.address, `phoneNumbers`[1].`kind` = 'Home' as selfEmployed, "
- + " MAP[cast(id as varchar), `phoneNumbers`[1].number] as mapValues, phoneNumbers, "
- + " cast(companyId as varchar) || name ||`phoneNumbers`[1].number || 'concat' as name , "
- + " 100 * ((companyId + 122) / 3 ) as companyId "
- + " from testavro.PROFILE as p where p.address.zip > 0 "
- + " and p.address.zip < 100003 and p.address.streetnum.number > 0 ";
-
- List<String> sqlStmts = Collections.singletonList(sql1);
- staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-
- Config config = new MapConfig(staticConfigs);
- new SamzaSqlValidator(config).validate(sqlStmts);
-
- runApplication(config);
-
- List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);
- Assert.assertEquals(numMessages, outMessages.size());
- }
-
@Test
public void testEndToEndFlattenWithUdf() throws Exception {
int numMessages = 20;
@@ -769,7 +701,7 @@
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 = "Insert into testavro.PROFILE1(id, address) "
- + "select id, BuildOutputRecord('key', p.address.zip) as address from testavro.PROFILE as p";
+ + "select id, BuildOutputRecord('key', GetNestedField(address, 'zip')) as address from testavro.PROFILE";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
runApplication(new MapConfig(staticConfigs));
@@ -846,7 +778,7 @@
+ " p.name as profileName, p.address as profileAddress "
+ "from testavro.PROFILE.`$table` as p "
+ "join testavro.PAGEVIEW as pv "
- + " on p.id = pv.profileId where p.name = 'Mike' or p.name is not null";
+ + " on p.id = pv.profileId";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -867,7 +799,7 @@
}
@Test
- public void testEndToEndStreamTableInnerJoinWithPrimaryKey() {
+ public void testEndToEndStreamTableInnerJoinWithPrimaryKey() throws Exception {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
index bd541c6..c41038d 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
@@ -138,7 +138,7 @@
+ " p.name as profileName, p.address as profileAddress "
+ "from testRemoteStore.Profile.`$table` as p "
+ "join testavro.PAGEVIEW as pv "
- + " on p.__key__ = pv.profileId where p.name is not null";
+ + " on p.__key__ = pv.profileId";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -336,7 +336,7 @@
+ " p.name as profileName, p.address as profileAddress "
+ "from testRemoteStore.Profile.`$table` as p "
+ "right join testavro.PAGEVIEW as pv "
- + " on p.__key__ = pv.profileId where p.name is null or p.name <> '0'";
+ + " on p.__key__ = pv.profileId";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -400,82 +400,6 @@
@Test
- public void testSourceEndToEndWithFilterAndInnerJoin() throws SamzaSqlValidatorException {
- int numMessages = 20;
- TestAvroSystemFactory.messages.clear();
- RemoteStoreIOResolverTestFactory.records.clear();
- Map<String, String> staticConfigs =
- SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), numMessages, true);
- populateProfileTable(staticConfigs, numMessages);
-
- String sql = "Insert into testavro.enrichedPageViewTopic "
- + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
- + " p.name as profileName, p.address as profileAddress "
- + "from testavro.PAGEVIEW as pv "
- + "join testRemoteStore.Profile.`$table` as p "
- + " on p.__key__ = pv.profileId"
- + " where p.name <> 'Mike' ";
-
- List<String> sqlStmts = Arrays.asList(sql);
- staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-
- Config config = new MapConfig(staticConfigs);
- new SamzaSqlValidator(config).validate(sqlStmts);
-
- runApplication(config);
-
- List<String> outMessages = TestAvroSystemFactory.messages.stream()
- .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," + (
- ((GenericRecord) x.getMessage()).get("profileName") == null ? "null"
- : ((GenericRecord) x.getMessage()).get("profileName").toString()))
- .collect(Collectors.toList());
- List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameJoinWithNullForeignKeys(numMessages)
- .stream()
- .filter(x -> !x.contains("Mike"))
- .collect(Collectors.toList());
- Assert.assertEquals(expectedOutMessages, outMessages);
- }
-
- @Test
- public void testSourceEndToEndWithFilterAndLeftOuterJoin() throws SamzaSqlValidatorException {
- int numMessages = 20;
- TestAvroSystemFactory.messages.clear();
- RemoteStoreIOResolverTestFactory.records.clear();
- Map<String, String> staticConfigs =
- SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), numMessages, true);
- populateProfileTable(staticConfigs, numMessages);
-
- String sql = "Insert into testavro.enrichedPageViewTopic "
- + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
- + " p.name as profileName, p.address as profileAddress "
- + "from testavro.PAGEVIEW as pv "
- + " LEFT Join testRemoteStore.Profile.`$table` as p "
- + " on pv.profileId + 1 - (2/2) = p.__key__ "
- + " where p.name <> 'Mary' or p.name is null";
-
- List<String> sqlStmts = Arrays.asList(sql);
- staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-
- Config config = new MapConfig(staticConfigs);
- new SamzaSqlValidator(config).validate(sqlStmts);
-
- runApplication(config);
-
- List<String> outMessages = TestAvroSystemFactory.messages.stream()
- .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," + (
- ((GenericRecord) x.getMessage()).get("profileName") == null ? "null"
- : ((GenericRecord) x.getMessage()).get("profileName").toString()))
- .collect(Collectors.toList());
- List<String> expectedOutMessages =
- TestAvroSystemFactory.getPageKeyProfileNameOuterJoinWithNullForeignKeys(numMessages)
- .stream()
- .filter(x -> !x.contains("Mary"))
- .collect(Collectors.toList());
-
- Assert.assertEquals(expectedOutMessages, outMessages);
- }
-
- @Test
public void testSameJoinTargetSinkEndToEndRightOuterJoin() throws SamzaSqlValidatorException {
int numMessages = 21;