ToJson should support logical types
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToJson.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToJson.java
index d10de20..edeea36 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToJson.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToJson.java
@@ -40,7 +40,6 @@
 @Experimental
 public class ToJson<T> extends PTransform<PCollection<T>, PCollection<String>> {
   private transient volatile @Nullable ObjectMapper objectMapper;
-  private Schema inputSchema;
 
   private ToJson() {}
 
@@ -48,21 +47,17 @@
     return new ToJson<>();
   }
 
-  public ToJson<T> withSchema(Schema schema) {
-    inputSchema = schema;
-    return this;
-  }
-
   @Override
   public PCollection<String> expand(PCollection<T> rows) {
+    Schema inputSchema = rows.getSchema();
     SerializableFunction<T, Row> toRow = rows.getToRowFunction();
-    final Schema schema = this.inputSchema == null ? rows.getSchema() : this.inputSchema;
     return rows.apply(
         ParDo.of(
             new DoFn<T, String>() {
               @ProcessElement
               public void processElement(ProcessContext context) {
-                context.output(rowToJson(objectMapper(schema), toRow.apply(context.element())));
+                context.output(
+                    rowToJson(objectMapper(inputSchema), toRow.apply(context.element())));
               }
             }));
   }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
index 3c515fb..ae63967 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
@@ -324,20 +324,10 @@
           gen.writeNumber((byte) value);
           break;
         case DOUBLE:
-          // BigDecimal cannot be cast to double directly
-          if (value instanceof BigDecimal) {
-            gen.writeNumber(((BigDecimal) value).doubleValue());
-          } else {
-            gen.writeNumber((double) value);
-          }
+          gen.writeNumber((double) value);
           break;
         case FLOAT:
-          // BigDecimal cannot be cast to float directly
-          if (value instanceof BigDecimal) {
-            gen.writeNumber(((BigDecimal) value).floatValue());
-          } else {
-            gen.writeNumber((float) value);
-          }
+          gen.writeNumber((float) value);
           break;
         case INT16:
           gen.writeNumber((short) value);
@@ -361,6 +351,9 @@
         case ROW:
           writeRow((Row) value, type.getRowSchema(), gen);
           break;
+        case LOGICAL_TYPE:
+          writeValue(gen, type.getLogicalType().getBaseType(), value);
+          break;
         default:
           throw new IllegalArgumentException("Unsupported field type: " + type);
       }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValidation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValidation.java
index 69cd1d3..af929ff 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValidation.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValidation.java
@@ -59,6 +59,11 @@
       return;
     }
 
+    if (fieldTypeName.isLogicalType()) {
+      verifyFieldTypeSupported(fieldType.getLogicalType().getBaseType());
+      return;
+    }
+
     if (!SUPPORTED_TYPES.contains(fieldTypeName)) {
       throw new RowJson.RowJsonDeserializer.UnsupportedRowJsonException(
           fieldTypeName.name()
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java
index e01a1c5..7b8ce03 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java
@@ -87,7 +87,7 @@
   @Override
   public POutput buildIOWriter(PCollection<Row> input) {
     return input
-        .apply(RowToDocument.withSchema(getSchema()))
+        .apply(new RowToDocument())
         .apply(MongoDbIO.write().withUri(dbUri).withDatabase(dbName).withCollection(dbCollection));
   }
 
@@ -147,20 +147,18 @@
   }
 
   public static class RowToDocument extends PTransform<PCollection<Row>, PCollection<Document>> {
-    private final Schema schema;
 
-    private RowToDocument(Schema schema) {
-      this.schema = schema;
-    }
+    private RowToDocument() {}
 
-    public static RowToDocument withSchema(Schema schema) {
-      return new RowToDocument(schema);
+    public static RowToDocument convert() {
+      return new RowToDocument();
     }
 
     @Override
     public PCollection<Document> expand(PCollection<Row> input) {
       return input
-          .apply("Transform Rows to JSON", ToJson.<Row>of().withSchema(schema))
+          // TODO(BEAM-8498): figure out a way convert Row directly to Document.
+          .apply("Transform Rows to JSON", ToJson.of())
           .apply("Produce documents from JSON", MapElements.via(new ObjectToDocumentFn()));
     }
 
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTableTest.java
index cccac9c..237b34d 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTableTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTableTest.java
@@ -28,6 +28,7 @@
 
 import java.util.Arrays;
 import org.apache.beam.sdk.extensions.sql.meta.provider.mongodb.MongoDbTable.DocumentToRow;
+import org.apache.beam.sdk.extensions.sql.meta.provider.mongodb.MongoDbTable.RowToDocument;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.testing.PAssert;
@@ -78,7 +79,7 @@
     PCollection<Row> output =
         pipeline
             .apply("Create document from JSON", Create.<Document>of(Document.parse(JSON_ROW)))
-            .apply("CConvert document to Row", DocumentToRow.withSchema(SCHEMA));
+            .apply("Convert document to Row", DocumentToRow.withSchema(SCHEMA));
 
     // Make sure proper rows are constructed from JSON.
     PAssert.that(output)
@@ -99,6 +100,35 @@
     pipeline.run().waitUntilFinish();
   }
 
+  @Test
+  public void testRowToDocumentConverter() {
+    PCollection<Document> output =
+        pipeline
+            .apply(
+                "Create a row",
+                Create.of(
+                        row(
+                            SCHEMA,
+                            9223372036854775807L,
+                            2147483647,
+                            (short) 32767,
+                            (byte) 127,
+                            true,
+                            1.0,
+                            (float) 1.0,
+                            "string",
+                            row(
+                                Schema.builder().addNullableField("int32", INT32).build(),
+                                2147483645),
+                            Arrays.asList("str1", "str2", "str3")))
+                    .withRowSchema(SCHEMA))
+            .apply("Convert row to document", RowToDocument.convert());
+
+    PAssert.that(output).containsInAnyOrder(Document.parse(JSON_ROW));
+
+    pipeline.run().waitUntilFinish();
+  }
+
   private Row row(Schema schema, Object... values) {
     return Row.withSchema(schema).addValues(values).build();
   }