[BEAM-8888] Improve handling of unknown types in CalciteUtils.

Also enables the nullness checker to make sure we didn't miss anything.

Because of the design of the Row/Schemas API, we unfortunately have to
add a bunch of checks. These fields are probably not allowed to be null
during Schema construction, but the checker framework does not know
that.
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index 34664ac..7b580ef 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -27,6 +27,7 @@
 import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
 import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.util.Preconditions;
 import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.BiMap;
 import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableBiMap;
 import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;
@@ -39,9 +40,6 @@
 import org.joda.time.base.AbstractInstant;
 
 /** Utility methods for Calcite related operations. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public class CalciteUtils {
   private static final long UNLIMITED_ARRAY_SIZE = -1L;
 
@@ -73,7 +71,9 @@
     }
 
     if (fieldType.getTypeName().isLogicalType()) {
-      String logicalId = fieldType.getLogicalType().getIdentifier();
+      Schema.LogicalType logicalType = fieldType.getLogicalType();
+      Preconditions.checkArgumentNotNull(logicalType);
+      String logicalId = logicalType.getIdentifier();
       return logicalId.equals(SqlTypes.DATE.getIdentifier())
           || logicalId.equals(SqlTypes.TIME.getIdentifier())
           || logicalId.equals(TimeWithLocalTzType.IDENTIFIER)
@@ -88,7 +88,9 @@
     }
 
     if (fieldType.getTypeName().isLogicalType()) {
-      String logicalId = fieldType.getLogicalType().getIdentifier();
+      Schema.LogicalType logicalType = fieldType.getLogicalType();
+      Preconditions.checkArgumentNotNull(logicalType);
+      String logicalId = logicalType.getIdentifier();
       return logicalId.equals(CharType.IDENTIFIER);
     }
     return false;
@@ -210,7 +212,12 @@
                     + "so it cannot be converted to a %s",
                 sqlTypeName, Schema.FieldType.class.getSimpleName()));
       default:
-        return CALCITE_TO_BEAM_TYPE_MAPPING.get(sqlTypeName);
+        FieldType fieldType = CALCITE_TO_BEAM_TYPE_MAPPING.get(sqlTypeName);
+        if (fieldType == null) {
+          throw new IllegalArgumentException(
+              "Cannot find a matching Beam FieldType for Calcite type: " + sqlTypeName);
+        }
+        return fieldType;
     }
   }
 
@@ -234,7 +241,12 @@
         return FieldType.row(toSchema(calciteType));
 
       default:
-        return toFieldType(calciteType.getSqlTypeName()).withNullable(calciteType.isNullable());
+        try {
+          return toFieldType(calciteType.getSqlTypeName()).withNullable(calciteType.isNullable());
+        } catch (IllegalArgumentException e) {
+          throw new IllegalArgumentException(
+              "Cannot find a matching Beam FieldType for Calcite type: " + calciteType, e);
+        }
     }
   }
 
@@ -254,16 +266,22 @@
     switch (fieldType.getTypeName()) {
       case ARRAY:
       case ITERABLE:
+        FieldType collectionElementType = fieldType.getCollectionElementType();
+        Preconditions.checkArgumentNotNull(collectionElementType);
         return dataTypeFactory.createArrayType(
-            toRelDataType(dataTypeFactory, fieldType.getCollectionElementType()),
-            UNLIMITED_ARRAY_SIZE);
+            toRelDataType(dataTypeFactory, collectionElementType), UNLIMITED_ARRAY_SIZE);
       case MAP:
-        RelDataType componentKeyType = toRelDataType(dataTypeFactory, fieldType.getMapKeyType());
-        RelDataType componentValueType =
-            toRelDataType(dataTypeFactory, fieldType.getMapValueType());
+        FieldType mapKeyType = fieldType.getMapKeyType();
+        FieldType mapValueType = fieldType.getMapValueType();
+        Preconditions.checkArgumentNotNull(mapKeyType);
+        Preconditions.checkArgumentNotNull(mapValueType);
+        RelDataType componentKeyType = toRelDataType(dataTypeFactory, mapKeyType);
+        RelDataType componentValueType = toRelDataType(dataTypeFactory, mapValueType);
         return dataTypeFactory.createMapType(componentKeyType, componentValueType);
       case ROW:
-        return toCalciteRowType(fieldType.getRowSchema(), dataTypeFactory);
+        Schema schema = fieldType.getRowSchema();
+        Preconditions.checkArgumentNotNull(schema);
+        return toCalciteRowType(schema, dataTypeFactory);
       default:
         return dataTypeFactory.createSqlType(toSqlTypeName(fieldType));
     }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java
index 50b6ab2..e76ee7f 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java
@@ -30,13 +30,17 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeFactoryImpl;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 /** Tests for conversion from Beam schema to Calcite data type. */
 public class CalciteUtilsTest {
 
   RelDataTypeFactory dataTypeFactory;
 
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
   @Before
   public void setUp() {
     dataTypeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
@@ -166,4 +170,12 @@
 
     assertEquals(schema, out);
   }
+
+  @Test
+  public void testFieldTypeNotFound() {
+    RelDataType relDataType = dataTypeFactory.createUnknownType();
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot find a matching Beam FieldType for Calcite type: UNKNOWN");
+    CalciteUtils.toFieldType(relDataType);
+  }
 }