spotlesApply
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToJson.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToJson.java
index 56e263e..d10de20 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToJson.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToJson.java
@@ -40,7 +40,7 @@
 @Experimental
 public class ToJson<T> extends PTransform<PCollection<T>, PCollection<String>> {
   private transient volatile @Nullable ObjectMapper objectMapper;
-  private static Schema inputSchema;
+  private Schema inputSchema;
 
   private ToJson() {}
 
@@ -55,17 +55,14 @@
 
   @Override
   public PCollection<String> expand(PCollection<T> rows) {
-    if (inputSchema == null) {
-      inputSchema = rows.getSchema();
-    }
     SerializableFunction<T, Row> toRow = rows.getToRowFunction();
+    final Schema schema = this.inputSchema == null ? rows.getSchema() : this.inputSchema;
     return rows.apply(
         ParDo.of(
             new DoFn<T, String>() {
               @ProcessElement
               public void processElement(ProcessContext context) {
-                context.output(
-                    rowToJson(objectMapper(inputSchema), toRow.apply(context.element())));
+                context.output(rowToJson(objectMapper(schema), toRow.apply(context.element())));
               }
             }));
   }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
index b96b35a..3c515fb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
@@ -321,7 +321,7 @@
           gen.writeString((String) value);
           break;
         case BYTE:
-          gen.writeNumber((byte) (int) value);
+          gen.writeNumber((byte) value);
           break;
         case DOUBLE:
           // BigDecimal cannot be cast to double directly
@@ -340,7 +340,7 @@
           }
           break;
         case INT16:
-          gen.writeNumber((short) (int) value);
+          gen.writeNumber((short) value);
           break;
         case INT32:
           gen.writeNumber((int) value);
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java
index f4a2273..e01a1c5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java
@@ -31,9 +31,9 @@
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.JsonToRow;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.ToJson;
 import org.apache.beam.sdk.values.PBegin;
@@ -87,13 +87,8 @@
   @Override
   public POutput buildIOWriter(PCollection<Row> input) {
     return input
-        .apply("Transform Rows to JSON", ToJson.<Row>of().withSchema(getSchema()))
-        .apply("Produce documents from JSON", MapElements.via(new ObjectToDocumentFn()))
-        .apply(
-        MongoDbIO.write()
-            .withUri(dbUri)
-            .withDatabase(dbName)
-            .withCollection(dbCollection));
+        .apply(RowToDocument.withSchema(getSchema()))
+        .apply(MongoDbIO.write().withUri(dbUri).withDatabase(dbName).withCollection(dbCollection));
   }
 
   @Override
@@ -151,11 +146,30 @@
     }
   }
 
-  @VisibleForTesting
-  static class ObjectToDocumentFn extends SimpleFunction<String, Document> {
+  public static class RowToDocument extends PTransform<PCollection<Row>, PCollection<Document>> {
+    private final Schema schema;
+
+    private RowToDocument(Schema schema) {
+      this.schema = schema;
+    }
+
+    public static RowToDocument withSchema(Schema schema) {
+      return new RowToDocument(schema);
+    }
+
     @Override
-    public Document apply(String input) {
-      return Document.parse(input);
+    public PCollection<Document> expand(PCollection<Row> input) {
+      return input
+          .apply("Transform Rows to JSON", ToJson.<Row>of().withSchema(schema))
+          .apply("Produce documents from JSON", MapElements.via(new ObjectToDocumentFn()));
+    }
+
+    @VisibleForTesting
+    static class ObjectToDocumentFn extends SimpleFunction<String, Document> {
+      @Override
+      public Document apply(String input) {
+        return Document.parse(input);
+      }
     }
   }
 }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/package-info.java
index 2f9d205..51c9a74 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/package-info.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/package-info.java
@@ -22,4 +22,3 @@
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
-
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java
index c69c1e1..82cafb9 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java
@@ -94,7 +94,6 @@
   public static void setUp() throws Exception {
     PipelineOptionsFactory.register(MongoDBPipelineOptions.class);
     options = TestPipeline.testingPipelineOptions().as(MongoDBPipelineOptions.class);
-    options.setMongoDBHostName("localhost");
   }
 
   @AfterClass