spotlesApply
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToJson.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToJson.java
index 56e263e..d10de20 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToJson.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToJson.java
@@ -40,7 +40,7 @@
@Experimental
public class ToJson<T> extends PTransform<PCollection<T>, PCollection<String>> {
private transient volatile @Nullable ObjectMapper objectMapper;
- private static Schema inputSchema;
+ private Schema inputSchema;
private ToJson() {}
@@ -55,17 +55,14 @@
@Override
public PCollection<String> expand(PCollection<T> rows) {
- if (inputSchema == null) {
- inputSchema = rows.getSchema();
- }
SerializableFunction<T, Row> toRow = rows.getToRowFunction();
+ final Schema schema = this.inputSchema == null ? rows.getSchema() : this.inputSchema;
return rows.apply(
ParDo.of(
new DoFn<T, String>() {
@ProcessElement
public void processElement(ProcessContext context) {
- context.output(
- rowToJson(objectMapper(inputSchema), toRow.apply(context.element())));
+ context.output(rowToJson(objectMapper(schema), toRow.apply(context.element())));
}
}));
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
index b96b35a..3c515fb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
@@ -321,7 +321,7 @@
gen.writeString((String) value);
break;
case BYTE:
- gen.writeNumber((byte) (int) value);
+ gen.writeNumber((byte) value);
break;
case DOUBLE:
// BigDecimal cannot be cast to double directly
@@ -340,7 +340,7 @@
}
break;
case INT16:
- gen.writeNumber((short) (int) value);
+ gen.writeNumber((short) value);
break;
case INT32:
gen.writeNumber((int) value);
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java
index f4a2273..e01a1c5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java
@@ -31,9 +31,9 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.JsonToRow;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.ToJson;
import org.apache.beam.sdk.values.PBegin;
@@ -87,13 +87,8 @@
@Override
public POutput buildIOWriter(PCollection<Row> input) {
return input
- .apply("Transform Rows to JSON", ToJson.<Row>of().withSchema(getSchema()))
- .apply("Produce documents from JSON", MapElements.via(new ObjectToDocumentFn()))
- .apply(
- MongoDbIO.write()
- .withUri(dbUri)
- .withDatabase(dbName)
- .withCollection(dbCollection));
+ .apply(RowToDocument.withSchema(getSchema()))
+ .apply(MongoDbIO.write().withUri(dbUri).withDatabase(dbName).withCollection(dbCollection));
}
@Override
@@ -151,11 +146,30 @@
}
}
- @VisibleForTesting
- static class ObjectToDocumentFn extends SimpleFunction<String, Document> {
+ public static class RowToDocument extends PTransform<PCollection<Row>, PCollection<Document>> {
+ private final Schema schema;
+
+ private RowToDocument(Schema schema) {
+ this.schema = schema;
+ }
+
+ public static RowToDocument withSchema(Schema schema) {
+ return new RowToDocument(schema);
+ }
+
@Override
- public Document apply(String input) {
- return Document.parse(input);
+ public PCollection<Document> expand(PCollection<Row> input) {
+ return input
+ .apply("Transform Rows to JSON", ToJson.<Row>of().withSchema(schema))
+ .apply("Produce documents from JSON", MapElements.via(new ObjectToDocumentFn()));
+ }
+
+ @VisibleForTesting
+ static class ObjectToDocumentFn extends SimpleFunction<String, Document> {
+ @Override
+ public Document apply(String input) {
+ return Document.parse(input);
+ }
}
}
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/package-info.java
index 2f9d205..51c9a74 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/package-info.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/package-info.java
@@ -22,4 +22,3 @@
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
-
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java
index c69c1e1..82cafb9 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java
@@ -94,7 +94,6 @@
public static void setUp() throws Exception {
PipelineOptionsFactory.register(MongoDBPipelineOptions.class);
options = TestPipeline.testingPipelineOptions().as(MongoDBPipelineOptions.class);
- options.setMongoDBHostName("localhost");
}
@AfterClass