ToJson should support logical types
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToJson.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToJson.java
index d10de20..edeea36 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToJson.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToJson.java
@@ -40,7 +40,6 @@
@Experimental
public class ToJson<T> extends PTransform<PCollection<T>, PCollection<String>> {
private transient volatile @Nullable ObjectMapper objectMapper;
- private Schema inputSchema;
private ToJson() {}
@@ -48,21 +47,17 @@
return new ToJson<>();
}
- public ToJson<T> withSchema(Schema schema) {
- inputSchema = schema;
- return this;
- }
-
@Override
public PCollection<String> expand(PCollection<T> rows) {
+ Schema inputSchema = rows.getSchema();
SerializableFunction<T, Row> toRow = rows.getToRowFunction();
- final Schema schema = this.inputSchema == null ? rows.getSchema() : this.inputSchema;
return rows.apply(
ParDo.of(
new DoFn<T, String>() {
@ProcessElement
public void processElement(ProcessContext context) {
- context.output(rowToJson(objectMapper(schema), toRow.apply(context.element())));
+ context.output(
+ rowToJson(objectMapper(inputSchema), toRow.apply(context.element())));
}
}));
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
index 3c515fb..ae63967 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
@@ -324,20 +324,10 @@
gen.writeNumber((byte) value);
break;
case DOUBLE:
- // BigDecimal cannot be cast to double directly
- if (value instanceof BigDecimal) {
- gen.writeNumber(((BigDecimal) value).doubleValue());
- } else {
- gen.writeNumber((double) value);
- }
+ gen.writeNumber((double) value);
break;
case FLOAT:
- // BigDecimal cannot be cast to float directly
- if (value instanceof BigDecimal) {
- gen.writeNumber(((BigDecimal) value).floatValue());
- } else {
- gen.writeNumber((float) value);
- }
+ gen.writeNumber((float) value);
break;
case INT16:
gen.writeNumber((short) value);
@@ -361,6 +351,9 @@
case ROW:
writeRow((Row) value, type.getRowSchema(), gen);
break;
+ case LOGICAL_TYPE:
+ writeValue(gen, type.getLogicalType().getBaseType(), value);
+ break;
default:
throw new IllegalArgumentException("Unsupported field type: " + type);
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValidation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValidation.java
index 69cd1d3..af929ff 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValidation.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValidation.java
@@ -59,6 +59,11 @@
return;
}
+ if (fieldTypeName.isLogicalType()) {
+ verifyFieldTypeSupported(fieldType.getLogicalType().getBaseType());
+ return;
+ }
+
if (!SUPPORTED_TYPES.contains(fieldTypeName)) {
throw new RowJson.RowJsonDeserializer.UnsupportedRowJsonException(
fieldTypeName.name()
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java
index e01a1c5..7b8ce03 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java
@@ -87,7 +87,7 @@
@Override
public POutput buildIOWriter(PCollection<Row> input) {
return input
- .apply(RowToDocument.withSchema(getSchema()))
+ .apply(new RowToDocument())
.apply(MongoDbIO.write().withUri(dbUri).withDatabase(dbName).withCollection(dbCollection));
}
@@ -147,20 +147,18 @@
}
public static class RowToDocument extends PTransform<PCollection<Row>, PCollection<Document>> {
- private final Schema schema;
- private RowToDocument(Schema schema) {
- this.schema = schema;
- }
+ private RowToDocument() {}
- public static RowToDocument withSchema(Schema schema) {
- return new RowToDocument(schema);
+ public static RowToDocument convert() {
+ return new RowToDocument();
}
@Override
public PCollection<Document> expand(PCollection<Row> input) {
return input
- .apply("Transform Rows to JSON", ToJson.<Row>of().withSchema(schema))
+ // TODO(BEAM-8498): figure out a way convert Row directly to Document.
+ .apply("Transform Rows to JSON", ToJson.of())
.apply("Produce documents from JSON", MapElements.via(new ObjectToDocumentFn()));
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTableTest.java
index cccac9c..237b34d 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTableTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTableTest.java
@@ -28,6 +28,7 @@
import java.util.Arrays;
import org.apache.beam.sdk.extensions.sql.meta.provider.mongodb.MongoDbTable.DocumentToRow;
+import org.apache.beam.sdk.extensions.sql.meta.provider.mongodb.MongoDbTable.RowToDocument;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.testing.PAssert;
@@ -78,7 +79,7 @@
PCollection<Row> output =
pipeline
.apply("Create document from JSON", Create.<Document>of(Document.parse(JSON_ROW)))
- .apply("CConvert document to Row", DocumentToRow.withSchema(SCHEMA));
+ .apply("Convert document to Row", DocumentToRow.withSchema(SCHEMA));
// Make sure proper rows are constructed from JSON.
PAssert.that(output)
@@ -99,6 +100,35 @@
pipeline.run().waitUntilFinish();
}
+ @Test
+ public void testRowToDocumentConverter() {
+ PCollection<Document> output =
+ pipeline
+ .apply(
+ "Create a row",
+ Create.of(
+ row(
+ SCHEMA,
+ 9223372036854775807L,
+ 2147483647,
+ (short) 32767,
+ (byte) 127,
+ true,
+ 1.0,
+ (float) 1.0,
+ "string",
+ row(
+ Schema.builder().addNullableField("int32", INT32).build(),
+ 2147483645),
+ Arrays.asList("str1", "str2", "str3")))
+ .withRowSchema(SCHEMA))
+ .apply("Convert row to document", RowToDocument.convert());
+
+ PAssert.that(output).containsInAnyOrder(Document.parse(JSON_ROW));
+
+ pipeline.run().waitUntilFinish();
+ }
+
private Row row(Schema schema, Object... values) {
return Row.withSchema(schema).addValues(values).build();
}