Simplify ZetaSqlBeamTranslationUtils
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
index bff1202..460e600 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
@@ -167,8 +167,7 @@
       for (int i = 0; i < inputSchema.getFieldCount(); i++) {
         options.addExpressionColumn(
             columnName(i),
-            ZetaSqlBeamTranslationUtils.beamFieldTypeToZetaSqlType(
-                inputSchema.getField(i).getType()));
+            ZetaSqlBeamTranslationUtils.toZetaSqlType(inputSchema.getField(i).getType()));
       }
 
       exp = new PreparedExpression(sql);
@@ -182,15 +181,13 @@
       for (int i = 0; i < inputSchema.getFieldCount(); i++) {
         columns.put(
             columnName(i),
-            ZetaSqlBeamTranslationUtils.javaObjectToZetaSqlValue(
+            ZetaSqlBeamTranslationUtils.toZetaSqlValue(
                 row.getBaseValue(i, Object.class), inputSchema.getField(i).getType()));
       }
 
       Value v = exp.execute(columns, nullParams);
       if (!v.isNull()) {
-        Row outputRow =
-            ZetaSqlBeamTranslationUtils.zetaSqlStructValueToBeamRow(
-                v, outputSchema, verifyRowValues);
+        Row outputRow = ZetaSqlBeamTranslationUtils.toBeamRow(v, outputSchema, verifyRowValues);
         c.output(outputRow);
       }
     }
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
index dbab34a..5a0762d 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
@@ -56,7 +56,7 @@
   private ZetaSqlBeamTranslationUtils() {}
 
   // Type conversion: Beam => ZetaSQL
-  public static Type beamFieldTypeToZetaSqlType(FieldType fieldType) {
+  public static Type toZetaSqlType(FieldType fieldType) {
     switch (fieldType.getTypeName()) {
       case INT64:
         return TypeFactory.createSimpleType(TypeKind.TYPE_INT64);
@@ -73,52 +73,42 @@
       case DATETIME:
         // TODO[BEAM-10238]: Mapping TIMESTAMP to a Beam LogicalType instead?
         return TypeFactory.createSimpleType(TypeKind.TYPE_TIMESTAMP);
-      case ARRAY:
-        return beamElementFieldTypeToZetaSqlArrayType(fieldType.getCollectionElementType());
-      case ROW:
-        return beamSchemaToZetaSqlStructType(fieldType.getRowSchema());
       case LOGICAL_TYPE:
-        return beamLogicalTypeToZetaSqlType(fieldType.getLogicalType().getIdentifier());
+        String identifier = fieldType.getLogicalType().getIdentifier();
+        if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
+          return TypeFactory.createSimpleType(TypeKind.TYPE_DATE);
+        } else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
+          return TypeFactory.createSimpleType(TypeKind.TYPE_TIME);
+        } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
+          return TypeFactory.createSimpleType(TypeKind.TYPE_DATETIME);
+        } else {
+          throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
+        }
+      case ARRAY:
+        return toZetaSqlArrayType(fieldType.getCollectionElementType());
+      case ROW:
+        return toZetaSqlStructType(fieldType.getRowSchema());
       default:
         throw new UnsupportedOperationException(
             "Unknown Beam fieldType: " + fieldType.getTypeName());
     }
   }
 
-  private static ArrayType beamElementFieldTypeToZetaSqlArrayType(FieldType elementFieldType) {
-    return TypeFactory.createArrayType(beamFieldTypeToZetaSqlType(elementFieldType));
+  private static ArrayType toZetaSqlArrayType(FieldType elementFieldType) {
+    return TypeFactory.createArrayType(toZetaSqlType(elementFieldType));
   }
 
-  public static StructType beamSchemaToZetaSqlStructType(Schema schema) {
+  public static StructType toZetaSqlStructType(Schema schema) {
     return TypeFactory.createStructType(
         schema.getFields().stream()
-            .map(ZetaSqlBeamTranslationUtils::beamFieldToZetaSqlStructField)
+            .map(f -> new StructField(f.getName(), toZetaSqlType(f.getType())))
             .collect(Collectors.toList()));
   }
 
-  private static StructField beamFieldToZetaSqlStructField(Field field) {
-    return new StructField(field.getName(), beamFieldTypeToZetaSqlType(field.getType()));
-  }
-
-  private static Type beamLogicalTypeToZetaSqlType(String identifier) {
-    if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
-      // Date type
-      return TypeFactory.createSimpleType(TypeKind.TYPE_DATE);
-    } else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
-      // Time type
-      return TypeFactory.createSimpleType(TypeKind.TYPE_TIME);
-    } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
-      // DateTime type
-      return TypeFactory.createSimpleType(TypeKind.TYPE_DATETIME);
-    } else {
-      throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
-    }
-  }
-
   // Value conversion: Beam => ZetaSQL
-  public static Value javaObjectToZetaSqlValue(Object object, FieldType fieldType) {
+  public static Value toZetaSqlValue(Object object, FieldType fieldType) {
     if (object == null) {
-      return Value.createNullValue(beamFieldTypeToZetaSqlType(fieldType));
+      return Value.createNullValue(toZetaSqlType(fieldType));
     }
     switch (fieldType.getTypeName()) {
       case INT64:
@@ -134,81 +124,69 @@
       case DECIMAL:
         return Value.createNumericValue((BigDecimal) object);
       case DATETIME:
-        return jodaInstantToZetaSqlTimestampValue((Instant) object);
-      case ARRAY:
-        return javaListToZetaSqlArrayValue(
-            (List<Object>) object, fieldType.getCollectionElementType());
-      case ROW:
-        return beamRowToZetaSqlStructValue((Row) object, fieldType.getRowSchema());
+        return Value.createTimestampValueFromUnixMicros(
+            LongMath.checkedMultiply(((Instant) object).getMillis(), MICROS_PER_MILLI));
       case LOGICAL_TYPE:
-        return beamLogicalObjectToZetaSqlValue(object, fieldType.getLogicalType().getIdentifier());
+        String identifier = fieldType.getLogicalType().getIdentifier();
+        if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
+          if (object instanceof Long) { // base type
+            return Value.createDateValue(((Long) object).intValue());
+          } else { // input type
+            return Value.createDateValue((int) ((LocalDate) object).toEpochDay());
+          }
+        } else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
+          if (object instanceof Long) { // base type
+            return Value.createTimeValue(
+                CivilTimeEncoder.encodePacked64TimeNanos(LocalTime.ofNanoOfDay((Long) object)));
+          } else { // input type
+            return Value.createTimeValue(
+                CivilTimeEncoder.encodePacked64TimeNanos((LocalTime) object));
+          }
+        } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
+          LocalDateTime datetime;
+          if (object instanceof Row) { // base type
+            datetime =
+                LocalDateTime.of(
+                    LocalDate.ofEpochDay(((Row) object).getInt64(DateTime.DATE_FIELD_NAME)),
+                    LocalTime.ofNanoOfDay(((Row) object).getInt64(DateTime.TIME_FIELD_NAME)));
+          } else { // input type
+            datetime = (LocalDateTime) object;
+          }
+          // TODO[BEAM-10611]: Create ZetaSQL Value.createDatetimeValue(LocalDateTime) function
+          return Value.createDatetimeValue(
+              CivilTimeEncoder.encodePacked64DatetimeSeconds(datetime), datetime.getNano());
+        } else {
+          throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
+        }
+      case ARRAY:
+        return toZetaSqlArrayValue((List<Object>) object, fieldType.getCollectionElementType());
+      case ROW:
+        return toZetaSqlStructValue((Row) object, fieldType.getRowSchema());
       default:
         throw new UnsupportedOperationException(
             "Unknown Beam fieldType: " + fieldType.getTypeName());
     }
   }
 
-  private static Value jodaInstantToZetaSqlTimestampValue(Instant instant) {
-    return Value.createTimestampValueFromUnixMicros(
-        LongMath.checkedMultiply(instant.getMillis(), MICROS_PER_MILLI));
-  }
-
-  private static Value javaListToZetaSqlArrayValue(List<Object> elements, FieldType elementType) {
+  private static Value toZetaSqlArrayValue(List<Object> elements, FieldType elementFieldType) {
     List<Value> values =
         elements.stream()
-            .map(e -> javaObjectToZetaSqlValue(e, elementType))
+            .map(e -> toZetaSqlValue(e, elementFieldType))
             .collect(Collectors.toList());
-    return Value.createArrayValue(beamElementFieldTypeToZetaSqlArrayType(elementType), values);
+    return Value.createArrayValue(toZetaSqlArrayType(elementFieldType), values);
   }
 
-  public static Value beamRowToZetaSqlStructValue(Row row, Schema schema) {
+  public static Value toZetaSqlStructValue(Row row, Schema schema) {
     List<Value> values = new ArrayList<>(row.getFieldCount());
 
     for (int i = 0; i < row.getFieldCount(); i++) {
-      values.add(
-          javaObjectToZetaSqlValue(
-              row.getBaseValue(i, Object.class), schema.getField(i).getType()));
+      values.add(toZetaSqlValue(row.getBaseValue(i, Object.class), schema.getField(i).getType()));
     }
-    return Value.createStructValue(beamSchemaToZetaSqlStructType(schema), values);
-  }
-
-  private static Value beamLogicalObjectToZetaSqlValue(Object object, String identifier) {
-    if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
-      // Date value
-      if (object instanceof Long) { // base type
-        return Value.createDateValue(((Long) object).intValue());
-      } else { // input type
-        return Value.createDateValue((int) ((LocalDate) object).toEpochDay());
-      }
-    } else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
-      // Time value
-      if (object instanceof Long) { // base type
-        return Value.createTimeValue(
-            CivilTimeEncoder.encodePacked64TimeNanos(LocalTime.ofNanoOfDay((Long) object)));
-      } else { // input type
-        return Value.createTimeValue(CivilTimeEncoder.encodePacked64TimeNanos((LocalTime) object));
-      }
-    } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
-      // DateTime value
-      LocalDateTime datetime;
-      if (object instanceof Row) { // base type
-        datetime =
-            LocalDateTime.of(
-                LocalDate.ofEpochDay(((Row) object).getInt64(DateTime.DATE_FIELD_NAME)),
-                LocalTime.ofNanoOfDay(((Row) object).getInt64(DateTime.TIME_FIELD_NAME)));
-      } else { // input type
-        datetime = (LocalDateTime) object;
-      }
-      // TODO[BEAM-10611]: Create ZetaSQL Value.createDatetimeValue(LocalDateTime) function
-      return Value.createDatetimeValue(
-          CivilTimeEncoder.encodePacked64DatetimeSeconds(datetime), datetime.getNano());
-    } else {
-      throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
-    }
+    return Value.createStructValue(toZetaSqlStructType(schema), values);
   }
 
   // Type conversion: ZetaSQL => Beam
-  public static FieldType zetaSqlTypeToBeamFieldType(Type type) {
+  public static FieldType toBeamType(Type type) {
     switch (type.getKind()) {
       case TYPE_INT64:
         return FieldType.INT64.withNullable(true);
@@ -231,39 +209,25 @@
       case TYPE_TIMESTAMP:
         return FieldType.DATETIME.withNullable(true);
       case TYPE_ARRAY:
-        return zetaSqlElementTypeToBeamArrayType(type.asArray().getElementType());
+        return FieldType.array(toBeamType(type.asArray().getElementType())).withNullable(true);
       case TYPE_STRUCT:
-        return zetaSqlStructTypeToBeamRowType(type.asStruct());
+        return FieldType.row(
+                type.asStruct().getFieldList().stream()
+                    .map(f -> Field.of(f.getName(), toBeamType(f.getType())))
+                    .collect(Schema.toSchema()))
+            .withNullable(true);
       default:
         throw new UnsupportedOperationException("Unknown ZetaSQL type: " + type.getKind());
     }
   }
 
-  private static FieldType zetaSqlElementTypeToBeamArrayType(Type elementType) {
-    return FieldType.array(zetaSqlTypeToBeamFieldType(elementType)).withNullable(true);
-  }
-
-  private static FieldType zetaSqlStructTypeToBeamRowType(StructType structType) {
-    return FieldType.row(
-            structType.getFieldList().stream()
-                .map(ZetaSqlBeamTranslationUtils::zetaSqlStructFieldToBeamField)
-                .collect(Schema.toSchema()))
-        .withNullable(true);
-  }
-
-  private static Field zetaSqlStructFieldToBeamField(StructField structField) {
-    return Field.of(structField.getName(), zetaSqlTypeToBeamFieldType(structField.getType()));
-  }
-
   // Value conversion: ZetaSQL => Beam (target Beam type unknown)
-  public static Object zetaSqlValueToJavaObject(Value value, boolean verifyValues) {
-    return zetaSqlValueToJavaObject(
-        value, zetaSqlTypeToBeamFieldType(value.getType()), verifyValues);
+  public static Object toBeamObject(Value value, boolean verifyValues) {
+    return toBeamObject(value, toBeamType(value.getType()), verifyValues);
   }
 
   // Value conversion: ZetaSQL => Beam (target Beam type known)
-  public static Object zetaSqlValueToJavaObject(
-      Value value, FieldType fieldType, boolean verifyValues) {
+  public static Object toBeamObject(Value value, FieldType fieldType, boolean verifyValues) {
     if (value.isNull()) {
       return null;
     }
@@ -286,39 +250,40 @@
       case DECIMAL:
         return value.getNumericValue();
       case DATETIME:
-        return zetaSqlTimestampValueToJodaInstant(value);
-      case ARRAY:
-        return zetaSqlArrayValueToJavaList(
-            value, fieldType.getCollectionElementType(), verifyValues);
-      case ROW:
-        return zetaSqlStructValueToBeamRow(value, fieldType.getRowSchema(), verifyValues);
+        return Instant.ofEpochMilli(value.getTimestampUnixMicros() / MICROS_PER_MILLI);
       case LOGICAL_TYPE:
-        return zetaSqlValueToBeamLogicalObject(value, fieldType.getLogicalType().getIdentifier());
+        String identifier = fieldType.getLogicalType().getIdentifier();
+        if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
+          return LocalDate.ofEpochDay(value.getDateValue());
+        } else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
+          return CivilTimeEncoder.decodePacked64TimeNanosAsJavaTime(value.getTimeValue());
+        } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
+          return CivilTimeEncoder.decodePacked96DatetimeNanosAsJavaTime(value.getDatetimeValue());
+        } else {
+          throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
+        }
+      case ARRAY:
+        return toBeamList(value, fieldType.getCollectionElementType(), verifyValues);
+      case ROW:
+        return toBeamRow(value, fieldType.getRowSchema(), verifyValues);
       default:
         throw new UnsupportedOperationException(
             "Unknown Beam fieldType: " + fieldType.getTypeName());
     }
   }
 
-  private static Instant zetaSqlTimestampValueToJodaInstant(Value timestampValue) {
-    long millis = timestampValue.getTimestampUnixMicros() / MICROS_PER_MILLI;
-    return Instant.ofEpochMilli(millis);
-  }
-
-  private static List<Object> zetaSqlArrayValueToJavaList(
+  private static List<Object> toBeamList(
       Value arrayValue, FieldType elementType, boolean verifyValues) {
     return arrayValue.getElementList().stream()
-        .map(e -> zetaSqlValueToJavaObject(e, elementType, verifyValues))
+        .map(e -> toBeamObject(e, elementType, verifyValues))
         .collect(Collectors.toList());
   }
 
-  public static Row zetaSqlStructValueToBeamRow(
-      Value structValue, Schema schema, boolean verifyValues) {
+  public static Row toBeamRow(Value structValue, Schema schema, boolean verifyValues) {
     List<Object> objects = new ArrayList<>(schema.getFieldCount());
     List<Value> values = structValue.getFieldList();
     for (int i = 0; i < values.size(); i++) {
-      objects.add(
-          zetaSqlValueToJavaObject(values.get(i), schema.getField(i).getType(), verifyValues));
+      objects.add(toBeamObject(values.get(i), schema.getField(i).getType(), verifyValues));
     }
     Row row =
         verifyValues
@@ -326,19 +291,4 @@
             : Row.withSchema(schema).attachValues(objects);
     return row;
   }
-
-  private static Object zetaSqlValueToBeamLogicalObject(Value value, String identifier) {
-    if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
-      // Date value
-      return LocalDate.ofEpochDay(value.getDateValue());
-    } else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
-      // Time value
-      return CivilTimeEncoder.decodePacked64TimeNanosAsJavaTime(value.getTimeValue());
-    } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
-      // DateTime value
-      return CivilTimeEncoder.decodePacked96DatetimeNanosAsJavaTime(value.getDatetimeValue());
-    } else {
-      throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
-    }
-  }
 }
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java
index 7b450fb..e6e54cf 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java
@@ -137,21 +137,17 @@
 
   @Test
   public void testBeamFieldTypeToZetaSqlType() {
-    assertEquals(
-        ZetaSqlBeamTranslationUtils.beamFieldTypeToZetaSqlType(TEST_FIELD_TYPE), TEST_TYPE);
+    assertEquals(ZetaSqlBeamTranslationUtils.toZetaSqlType(TEST_FIELD_TYPE), TEST_TYPE);
   }
 
   @Test
   public void testJavaObjectToZetaSqlValue() {
-    assertEquals(
-        ZetaSqlBeamTranslationUtils.javaObjectToZetaSqlValue(TEST_ROW, TEST_FIELD_TYPE),
-        TEST_VALUE);
+    assertEquals(ZetaSqlBeamTranslationUtils.toZetaSqlValue(TEST_ROW, TEST_FIELD_TYPE), TEST_VALUE);
   }
 
   @Test
   public void testZetaSqlValueToJavaObject() {
     assertEquals(
-        ZetaSqlBeamTranslationUtils.zetaSqlValueToJavaObject(TEST_VALUE, TEST_FIELD_TYPE, true),
-        TEST_ROW);
+        ZetaSqlBeamTranslationUtils.toBeamObject(TEST_VALUE, TEST_FIELD_TYPE, true), TEST_ROW);
   }
 }