[BEAM-8888] Improve handling of unknown types in CalciteUtils.
Also enables the nullness checker to make sure we didn't miss anything.
Because of the design of the Row/Schemas API, we unfortunately have to
add a bunch of checks. These fields are probably not allowed to be null
during Schema construction, but the checker framework does not know
that.
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index 34664ac..7b580ef 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -27,6 +27,7 @@
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.BiMap;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableBiMap;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;
@@ -39,9 +40,6 @@
import org.joda.time.base.AbstractInstant;
/** Utility methods for Calcite related operations. */
-@SuppressWarnings({
- "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
public class CalciteUtils {
private static final long UNLIMITED_ARRAY_SIZE = -1L;
@@ -73,7 +71,9 @@
}
if (fieldType.getTypeName().isLogicalType()) {
- String logicalId = fieldType.getLogicalType().getIdentifier();
+ Schema.LogicalType logicalType = fieldType.getLogicalType();
+ Preconditions.checkArgumentNotNull(logicalType);
+ String logicalId = logicalType.getIdentifier();
return logicalId.equals(SqlTypes.DATE.getIdentifier())
|| logicalId.equals(SqlTypes.TIME.getIdentifier())
|| logicalId.equals(TimeWithLocalTzType.IDENTIFIER)
@@ -88,7 +88,9 @@
}
if (fieldType.getTypeName().isLogicalType()) {
- String logicalId = fieldType.getLogicalType().getIdentifier();
+ Schema.LogicalType logicalType = fieldType.getLogicalType();
+ Preconditions.checkArgumentNotNull(logicalType);
+ String logicalId = logicalType.getIdentifier();
return logicalId.equals(CharType.IDENTIFIER);
}
return false;
@@ -210,7 +212,12 @@
+ "so it cannot be converted to a %s",
sqlTypeName, Schema.FieldType.class.getSimpleName()));
default:
- return CALCITE_TO_BEAM_TYPE_MAPPING.get(sqlTypeName);
+ FieldType fieldType = CALCITE_TO_BEAM_TYPE_MAPPING.get(sqlTypeName);
+ if (fieldType == null) {
+ throw new IllegalArgumentException(
+ "Cannot find a matching Beam FieldType for Calcite type: " + sqlTypeName);
+ }
+ return fieldType;
}
}
@@ -234,7 +241,12 @@
return FieldType.row(toSchema(calciteType));
default:
- return toFieldType(calciteType.getSqlTypeName()).withNullable(calciteType.isNullable());
+ try {
+ return toFieldType(calciteType.getSqlTypeName()).withNullable(calciteType.isNullable());
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ "Cannot find a matching Beam FieldType for Calcite type: " + calciteType, e);
+ }
}
}
@@ -254,16 +266,22 @@
switch (fieldType.getTypeName()) {
case ARRAY:
case ITERABLE:
+ FieldType collectionElementType = fieldType.getCollectionElementType();
+ Preconditions.checkArgumentNotNull(collectionElementType);
return dataTypeFactory.createArrayType(
- toRelDataType(dataTypeFactory, fieldType.getCollectionElementType()),
- UNLIMITED_ARRAY_SIZE);
+ toRelDataType(dataTypeFactory, collectionElementType), UNLIMITED_ARRAY_SIZE);
case MAP:
- RelDataType componentKeyType = toRelDataType(dataTypeFactory, fieldType.getMapKeyType());
- RelDataType componentValueType =
- toRelDataType(dataTypeFactory, fieldType.getMapValueType());
+ FieldType mapKeyType = fieldType.getMapKeyType();
+ FieldType mapValueType = fieldType.getMapValueType();
+ Preconditions.checkArgumentNotNull(mapKeyType);
+ Preconditions.checkArgumentNotNull(mapValueType);
+ RelDataType componentKeyType = toRelDataType(dataTypeFactory, mapKeyType);
+ RelDataType componentValueType = toRelDataType(dataTypeFactory, mapValueType);
return dataTypeFactory.createMapType(componentKeyType, componentValueType);
case ROW:
- return toCalciteRowType(fieldType.getRowSchema(), dataTypeFactory);
+ Schema schema = fieldType.getRowSchema();
+ Preconditions.checkArgumentNotNull(schema);
+ return toCalciteRowType(schema, dataTypeFactory);
default:
return dataTypeFactory.createSqlType(toSqlTypeName(fieldType));
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java
index 50b6ab2..e76ee7f 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java
@@ -30,13 +30,17 @@
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
/** Tests for conversion from Beam schema to Calcite data type. */
public class CalciteUtilsTest {
RelDataTypeFactory dataTypeFactory;
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
@Before
public void setUp() {
dataTypeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
@@ -166,4 +170,12 @@
assertEquals(schema, out);
}
+
+ @Test
+ public void testFieldTypeNotFound() {
+ RelDataType relDataType = dataTypeFactory.createUnknownType();
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Cannot find a matching Beam FieldType for Calcite type: UNKNOWN");
+ CalciteUtils.toFieldType(relDataType);
+ }
}