Simplify ZetaSqlBeamTranslationUtils
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
index bff1202..460e600 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
@@ -167,8 +167,7 @@
for (int i = 0; i < inputSchema.getFieldCount(); i++) {
options.addExpressionColumn(
columnName(i),
- ZetaSqlBeamTranslationUtils.beamFieldTypeToZetaSqlType(
- inputSchema.getField(i).getType()));
+ ZetaSqlBeamTranslationUtils.toZetaSqlType(inputSchema.getField(i).getType()));
}
exp = new PreparedExpression(sql);
@@ -182,15 +181,13 @@
for (int i = 0; i < inputSchema.getFieldCount(); i++) {
columns.put(
columnName(i),
- ZetaSqlBeamTranslationUtils.javaObjectToZetaSqlValue(
+ ZetaSqlBeamTranslationUtils.toZetaSqlValue(
row.getBaseValue(i, Object.class), inputSchema.getField(i).getType()));
}
Value v = exp.execute(columns, nullParams);
if (!v.isNull()) {
- Row outputRow =
- ZetaSqlBeamTranslationUtils.zetaSqlStructValueToBeamRow(
- v, outputSchema, verifyRowValues);
+ Row outputRow = ZetaSqlBeamTranslationUtils.toBeamRow(v, outputSchema, verifyRowValues);
c.output(outputRow);
}
}
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
index dbab34a..5a0762d 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
@@ -56,7 +56,7 @@
private ZetaSqlBeamTranslationUtils() {}
// Type conversion: Beam => ZetaSQL
- public static Type beamFieldTypeToZetaSqlType(FieldType fieldType) {
+ public static Type toZetaSqlType(FieldType fieldType) {
switch (fieldType.getTypeName()) {
case INT64:
return TypeFactory.createSimpleType(TypeKind.TYPE_INT64);
@@ -73,52 +73,42 @@
case DATETIME:
// TODO[BEAM-10238]: Mapping TIMESTAMP to a Beam LogicalType instead?
return TypeFactory.createSimpleType(TypeKind.TYPE_TIMESTAMP);
- case ARRAY:
- return beamElementFieldTypeToZetaSqlArrayType(fieldType.getCollectionElementType());
- case ROW:
- return beamSchemaToZetaSqlStructType(fieldType.getRowSchema());
case LOGICAL_TYPE:
- return beamLogicalTypeToZetaSqlType(fieldType.getLogicalType().getIdentifier());
+ String identifier = fieldType.getLogicalType().getIdentifier();
+ if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
+ return TypeFactory.createSimpleType(TypeKind.TYPE_DATE);
+ } else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
+ return TypeFactory.createSimpleType(TypeKind.TYPE_TIME);
+ } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
+ return TypeFactory.createSimpleType(TypeKind.TYPE_DATETIME);
+ } else {
+ throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
+ }
+ case ARRAY:
+ return toZetaSqlArrayType(fieldType.getCollectionElementType());
+ case ROW:
+ return toZetaSqlStructType(fieldType.getRowSchema());
default:
throw new UnsupportedOperationException(
"Unknown Beam fieldType: " + fieldType.getTypeName());
}
}
- private static ArrayType beamElementFieldTypeToZetaSqlArrayType(FieldType elementFieldType) {
- return TypeFactory.createArrayType(beamFieldTypeToZetaSqlType(elementFieldType));
+ private static ArrayType toZetaSqlArrayType(FieldType elementFieldType) {
+ return TypeFactory.createArrayType(toZetaSqlType(elementFieldType));
}
- public static StructType beamSchemaToZetaSqlStructType(Schema schema) {
+ public static StructType toZetaSqlStructType(Schema schema) {
return TypeFactory.createStructType(
schema.getFields().stream()
- .map(ZetaSqlBeamTranslationUtils::beamFieldToZetaSqlStructField)
+ .map(f -> new StructField(f.getName(), toZetaSqlType(f.getType())))
.collect(Collectors.toList()));
}
- private static StructField beamFieldToZetaSqlStructField(Field field) {
- return new StructField(field.getName(), beamFieldTypeToZetaSqlType(field.getType()));
- }
-
- private static Type beamLogicalTypeToZetaSqlType(String identifier) {
- if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
- // Date type
- return TypeFactory.createSimpleType(TypeKind.TYPE_DATE);
- } else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
- // Time type
- return TypeFactory.createSimpleType(TypeKind.TYPE_TIME);
- } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
- // DateTime type
- return TypeFactory.createSimpleType(TypeKind.TYPE_DATETIME);
- } else {
- throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
- }
- }
-
// Value conversion: Beam => ZetaSQL
- public static Value javaObjectToZetaSqlValue(Object object, FieldType fieldType) {
+ public static Value toZetaSqlValue(Object object, FieldType fieldType) {
if (object == null) {
- return Value.createNullValue(beamFieldTypeToZetaSqlType(fieldType));
+ return Value.createNullValue(toZetaSqlType(fieldType));
}
switch (fieldType.getTypeName()) {
case INT64:
@@ -134,81 +124,69 @@
case DECIMAL:
return Value.createNumericValue((BigDecimal) object);
case DATETIME:
- return jodaInstantToZetaSqlTimestampValue((Instant) object);
- case ARRAY:
- return javaListToZetaSqlArrayValue(
- (List<Object>) object, fieldType.getCollectionElementType());
- case ROW:
- return beamRowToZetaSqlStructValue((Row) object, fieldType.getRowSchema());
+ return Value.createTimestampValueFromUnixMicros(
+ LongMath.checkedMultiply(((Instant) object).getMillis(), MICROS_PER_MILLI));
case LOGICAL_TYPE:
- return beamLogicalObjectToZetaSqlValue(object, fieldType.getLogicalType().getIdentifier());
+ String identifier = fieldType.getLogicalType().getIdentifier();
+ if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
+ if (object instanceof Long) { // base type
+ return Value.createDateValue(((Long) object).intValue());
+ } else { // input type
+ return Value.createDateValue((int) ((LocalDate) object).toEpochDay());
+ }
+ } else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
+ if (object instanceof Long) { // base type
+ return Value.createTimeValue(
+ CivilTimeEncoder.encodePacked64TimeNanos(LocalTime.ofNanoOfDay((Long) object)));
+ } else { // input type
+ return Value.createTimeValue(
+ CivilTimeEncoder.encodePacked64TimeNanos((LocalTime) object));
+ }
+ } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
+ LocalDateTime datetime;
+ if (object instanceof Row) { // base type
+ datetime =
+ LocalDateTime.of(
+ LocalDate.ofEpochDay(((Row) object).getInt64(DateTime.DATE_FIELD_NAME)),
+ LocalTime.ofNanoOfDay(((Row) object).getInt64(DateTime.TIME_FIELD_NAME)));
+ } else { // input type
+ datetime = (LocalDateTime) object;
+ }
+ // TODO[BEAM-10611]: Create ZetaSQL Value.createDatetimeValue(LocalDateTime) function
+ return Value.createDatetimeValue(
+ CivilTimeEncoder.encodePacked64DatetimeSeconds(datetime), datetime.getNano());
+ } else {
+ throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
+ }
+ case ARRAY:
+ return toZetaSqlArrayValue((List<Object>) object, fieldType.getCollectionElementType());
+ case ROW:
+ return toZetaSqlStructValue((Row) object, fieldType.getRowSchema());
default:
throw new UnsupportedOperationException(
"Unknown Beam fieldType: " + fieldType.getTypeName());
}
}
- private static Value jodaInstantToZetaSqlTimestampValue(Instant instant) {
- return Value.createTimestampValueFromUnixMicros(
- LongMath.checkedMultiply(instant.getMillis(), MICROS_PER_MILLI));
- }
-
- private static Value javaListToZetaSqlArrayValue(List<Object> elements, FieldType elementType) {
+ private static Value toZetaSqlArrayValue(List<Object> elements, FieldType elementFieldType) {
List<Value> values =
elements.stream()
- .map(e -> javaObjectToZetaSqlValue(e, elementType))
+ .map(e -> toZetaSqlValue(e, elementFieldType))
.collect(Collectors.toList());
- return Value.createArrayValue(beamElementFieldTypeToZetaSqlArrayType(elementType), values);
+ return Value.createArrayValue(toZetaSqlArrayType(elementFieldType), values);
}
- public static Value beamRowToZetaSqlStructValue(Row row, Schema schema) {
+ public static Value toZetaSqlStructValue(Row row, Schema schema) {
List<Value> values = new ArrayList<>(row.getFieldCount());
for (int i = 0; i < row.getFieldCount(); i++) {
- values.add(
- javaObjectToZetaSqlValue(
- row.getBaseValue(i, Object.class), schema.getField(i).getType()));
+ values.add(toZetaSqlValue(row.getBaseValue(i, Object.class), schema.getField(i).getType()));
}
- return Value.createStructValue(beamSchemaToZetaSqlStructType(schema), values);
- }
-
- private static Value beamLogicalObjectToZetaSqlValue(Object object, String identifier) {
- if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
- // Date value
- if (object instanceof Long) { // base type
- return Value.createDateValue(((Long) object).intValue());
- } else { // input type
- return Value.createDateValue((int) ((LocalDate) object).toEpochDay());
- }
- } else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
- // Time value
- if (object instanceof Long) { // base type
- return Value.createTimeValue(
- CivilTimeEncoder.encodePacked64TimeNanos(LocalTime.ofNanoOfDay((Long) object)));
- } else { // input type
- return Value.createTimeValue(CivilTimeEncoder.encodePacked64TimeNanos((LocalTime) object));
- }
- } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
- // DateTime value
- LocalDateTime datetime;
- if (object instanceof Row) { // base type
- datetime =
- LocalDateTime.of(
- LocalDate.ofEpochDay(((Row) object).getInt64(DateTime.DATE_FIELD_NAME)),
- LocalTime.ofNanoOfDay(((Row) object).getInt64(DateTime.TIME_FIELD_NAME)));
- } else { // input type
- datetime = (LocalDateTime) object;
- }
- // TODO[BEAM-10611]: Create ZetaSQL Value.createDatetimeValue(LocalDateTime) function
- return Value.createDatetimeValue(
- CivilTimeEncoder.encodePacked64DatetimeSeconds(datetime), datetime.getNano());
- } else {
- throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
- }
+ return Value.createStructValue(toZetaSqlStructType(schema), values);
}
// Type conversion: ZetaSQL => Beam
- public static FieldType zetaSqlTypeToBeamFieldType(Type type) {
+ public static FieldType toBeamType(Type type) {
switch (type.getKind()) {
case TYPE_INT64:
return FieldType.INT64.withNullable(true);
@@ -231,39 +209,25 @@
case TYPE_TIMESTAMP:
return FieldType.DATETIME.withNullable(true);
case TYPE_ARRAY:
- return zetaSqlElementTypeToBeamArrayType(type.asArray().getElementType());
+ return FieldType.array(toBeamType(type.asArray().getElementType())).withNullable(true);
case TYPE_STRUCT:
- return zetaSqlStructTypeToBeamRowType(type.asStruct());
+ return FieldType.row(
+ type.asStruct().getFieldList().stream()
+ .map(f -> Field.of(f.getName(), toBeamType(f.getType())))
+ .collect(Schema.toSchema()))
+ .withNullable(true);
default:
throw new UnsupportedOperationException("Unknown ZetaSQL type: " + type.getKind());
}
}
- private static FieldType zetaSqlElementTypeToBeamArrayType(Type elementType) {
- return FieldType.array(zetaSqlTypeToBeamFieldType(elementType)).withNullable(true);
- }
-
- private static FieldType zetaSqlStructTypeToBeamRowType(StructType structType) {
- return FieldType.row(
- structType.getFieldList().stream()
- .map(ZetaSqlBeamTranslationUtils::zetaSqlStructFieldToBeamField)
- .collect(Schema.toSchema()))
- .withNullable(true);
- }
-
- private static Field zetaSqlStructFieldToBeamField(StructField structField) {
- return Field.of(structField.getName(), zetaSqlTypeToBeamFieldType(structField.getType()));
- }
-
// Value conversion: ZetaSQL => Beam (target Beam type unknown)
- public static Object zetaSqlValueToJavaObject(Value value, boolean verifyValues) {
- return zetaSqlValueToJavaObject(
- value, zetaSqlTypeToBeamFieldType(value.getType()), verifyValues);
+ public static Object toBeamObject(Value value, boolean verifyValues) {
+ return toBeamObject(value, toBeamType(value.getType()), verifyValues);
}
// Value conversion: ZetaSQL => Beam (target Beam type known)
- public static Object zetaSqlValueToJavaObject(
- Value value, FieldType fieldType, boolean verifyValues) {
+ public static Object toBeamObject(Value value, FieldType fieldType, boolean verifyValues) {
if (value.isNull()) {
return null;
}
@@ -286,39 +250,40 @@
case DECIMAL:
return value.getNumericValue();
case DATETIME:
- return zetaSqlTimestampValueToJodaInstant(value);
- case ARRAY:
- return zetaSqlArrayValueToJavaList(
- value, fieldType.getCollectionElementType(), verifyValues);
- case ROW:
- return zetaSqlStructValueToBeamRow(value, fieldType.getRowSchema(), verifyValues);
+ return Instant.ofEpochMilli(value.getTimestampUnixMicros() / MICROS_PER_MILLI);
case LOGICAL_TYPE:
- return zetaSqlValueToBeamLogicalObject(value, fieldType.getLogicalType().getIdentifier());
+ String identifier = fieldType.getLogicalType().getIdentifier();
+ if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
+ return LocalDate.ofEpochDay(value.getDateValue());
+ } else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
+ return CivilTimeEncoder.decodePacked64TimeNanosAsJavaTime(value.getTimeValue());
+ } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
+ return CivilTimeEncoder.decodePacked96DatetimeNanosAsJavaTime(value.getDatetimeValue());
+ } else {
+ throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
+ }
+ case ARRAY:
+ return toBeamList(value, fieldType.getCollectionElementType(), verifyValues);
+ case ROW:
+ return toBeamRow(value, fieldType.getRowSchema(), verifyValues);
default:
throw new UnsupportedOperationException(
"Unknown Beam fieldType: " + fieldType.getTypeName());
}
}
- private static Instant zetaSqlTimestampValueToJodaInstant(Value timestampValue) {
- long millis = timestampValue.getTimestampUnixMicros() / MICROS_PER_MILLI;
- return Instant.ofEpochMilli(millis);
- }
-
- private static List<Object> zetaSqlArrayValueToJavaList(
+ private static List<Object> toBeamList(
Value arrayValue, FieldType elementType, boolean verifyValues) {
return arrayValue.getElementList().stream()
- .map(e -> zetaSqlValueToJavaObject(e, elementType, verifyValues))
+ .map(e -> toBeamObject(e, elementType, verifyValues))
.collect(Collectors.toList());
}
- public static Row zetaSqlStructValueToBeamRow(
- Value structValue, Schema schema, boolean verifyValues) {
+ public static Row toBeamRow(Value structValue, Schema schema, boolean verifyValues) {
List<Object> objects = new ArrayList<>(schema.getFieldCount());
List<Value> values = structValue.getFieldList();
for (int i = 0; i < values.size(); i++) {
- objects.add(
- zetaSqlValueToJavaObject(values.get(i), schema.getField(i).getType(), verifyValues));
+ objects.add(toBeamObject(values.get(i), schema.getField(i).getType(), verifyValues));
}
Row row =
verifyValues
@@ -326,19 +291,4 @@
: Row.withSchema(schema).attachValues(objects);
return row;
}
-
- private static Object zetaSqlValueToBeamLogicalObject(Value value, String identifier) {
- if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
- // Date value
- return LocalDate.ofEpochDay(value.getDateValue());
- } else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
- // Time value
- return CivilTimeEncoder.decodePacked64TimeNanosAsJavaTime(value.getTimeValue());
- } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
- // DateTime value
- return CivilTimeEncoder.decodePacked96DatetimeNanosAsJavaTime(value.getDatetimeValue());
- } else {
- throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
- }
- }
}
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java
index 7b450fb..e6e54cf 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java
@@ -137,21 +137,17 @@
@Test
public void testBeamFieldTypeToZetaSqlType() {
- assertEquals(
- ZetaSqlBeamTranslationUtils.beamFieldTypeToZetaSqlType(TEST_FIELD_TYPE), TEST_TYPE);
+ assertEquals(ZetaSqlBeamTranslationUtils.toZetaSqlType(TEST_FIELD_TYPE), TEST_TYPE);
}
@Test
public void testJavaObjectToZetaSqlValue() {
- assertEquals(
- ZetaSqlBeamTranslationUtils.javaObjectToZetaSqlValue(TEST_ROW, TEST_FIELD_TYPE),
- TEST_VALUE);
+ assertEquals(ZetaSqlBeamTranslationUtils.toZetaSqlValue(TEST_ROW, TEST_FIELD_TYPE), TEST_VALUE);
}
@Test
public void testZetaSqlValueToJavaObject() {
assertEquals(
- ZetaSqlBeamTranslationUtils.zetaSqlValueToJavaObject(TEST_VALUE, TEST_FIELD_TYPE, true),
- TEST_ROW);
+ ZetaSqlBeamTranslationUtils.toBeamObject(TEST_VALUE, TEST_FIELD_TYPE, true), TEST_ROW);
}
}