[FLINK-24745][format][json] Support Oracle OGG json format

This closes #17657.
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDecodingFormat.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDecodingFormat.java
new file mode 100644
index 0000000..0329129
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDecodingFormat.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.ogg;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.ogg.OggJsonDeserializationSchema.MetadataConverter;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** {@link DecodingFormat} for Ogg using JSON encoding. */
+public class OggJsonDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {
+
+    // --------------------------------------------------------------------------------------------
+    // Mutable attributes
+    // --------------------------------------------------------------------------------------------
+
+    private static final StringData KEY_SOURCE_TIMESTAMP = StringData.fromString("op_ts");
+
+    // --------------------------------------------------------------------------------------------
+    // Ogg-specific attributes
+    // --------------------------------------------------------------------------------------------
+    private static final StringData KEY_SOURCE_TABLE = StringData.fromString("table");
+    private final boolean ignoreParseErrors;
+    private final TimestampFormat timestampFormat;
+    private List<String> metadataKeys;
+
+    public OggJsonDecodingFormat(boolean ignoreParseErrors, TimestampFormat timestampFormat) {
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.timestampFormat = timestampFormat;
+        this.metadataKeys = Collections.emptyList();
+    }
+
+    private static Object readProperty(GenericRowData row, int pos, StringData key) {
+        final GenericMapData map = (GenericMapData) row.getMap(pos);
+        if (map == null) {
+            return null;
+        }
+        return map.get(key);
+    }
+
+    @Override
+    public DeserializationSchema<RowData> createRuntimeDecoder(
+            DynamicTableSource.Context context, DataType physicalDataType) {
+
+        final List<ReadableMetadata> readableMetadata =
+                metadataKeys.stream()
+                        .map(
+                                k ->
+                                        Stream.of(ReadableMetadata.values())
+                                                .filter(rm -> rm.key.equals(k))
+                                                .findFirst()
+                                                .orElseThrow(IllegalStateException::new))
+                        .collect(Collectors.toList());
+
+        final List<DataTypes.Field> metadataFields =
+                readableMetadata.stream()
+                        .map(m -> DataTypes.FIELD(m.key, m.dataType))
+                        .collect(Collectors.toList());
+
+        final DataType producedDataType =
+                DataTypeUtils.appendRowFields(physicalDataType, metadataFields);
+
+        final TypeInformation<RowData> producedTypeInfo =
+                context.createTypeInformation(producedDataType);
+
+        return new OggJsonDeserializationSchema(
+                physicalDataType,
+                readableMetadata,
+                producedTypeInfo,
+                ignoreParseErrors,
+                timestampFormat);
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Metadata handling
+    // --------------------------------------------------------------------------------------------
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+        Stream.of(ReadableMetadata.values())
+                .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
+        return metadataMap;
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys) {
+        this.metadataKeys = metadataKeys;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.newBuilder()
+                .addContainedKind(RowKind.INSERT)
+                .addContainedKind(RowKind.UPDATE_BEFORE)
+                .addContainedKind(RowKind.UPDATE_AFTER)
+                .addContainedKind(RowKind.DELETE)
+                .build();
+    }
+
+    /** List of metadata that can be read with this format. */
+    enum ReadableMetadata {
+        INGESTION_TIMESTAMP(
+                "current_ts",
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+                true,
+                DataTypes.FIELD("current_ts", DataTypes.BIGINT()),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
+                        return TimestampData.fromEpochMillis(row.getLong(pos));
+                    }
+                }),
+
+        SOURCE_TIMESTAMP(
+                "op_ts",
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+                true,
+                DataTypes.FIELD("op_ts", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int pos) {
+                        final StringData timestamp =
+                                (StringData) readProperty(row, pos, KEY_SOURCE_TIMESTAMP);
+                        if (timestamp == null) {
+                            return null;
+                        }
+                        return TimestampData.fromEpochMillis(Long.parseLong(timestamp.toString()));
+                    }
+                }),
+
+        SOURCE_TABLE(
+                "table",
+                DataTypes.STRING().nullable(),
+                true,
+                DataTypes.FIELD("table", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int pos) {
+                        return readProperty(row, pos, KEY_SOURCE_TABLE);
+                    }
+                }),
+
+        SOURCE_PROPERTIES(
+                "source.properties",
+                // key and value of the map are nullable to make handling easier in queries
+                DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable())
+                        .nullable(),
+                true,
+                DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int pos) {
+                        return row.getMap(pos);
+                    }
+                });
+
+        final String key;
+
+        final DataType dataType;
+
+        final boolean isJsonPayload;
+
+        final DataTypes.Field requiredJsonField;
+
+        final MetadataConverter converter;
+
+        ReadableMetadata(
+                String key,
+                DataType dataType,
+                boolean isJsonPayload,
+                DataTypes.Field requiredJsonField,
+                MetadataConverter converter) {
+            this.key = key;
+            this.dataType = dataType;
+            this.isJsonPayload = isJsonPayload;
+            this.requiredJsonField = requiredJsonField;
+            this.converter = converter;
+        }
+    }
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java
new file mode 100644
index 0000000..4a018d5
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.ogg;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import org.apache.flink.formats.json.ogg.OggJsonDecodingFormat.ReadableMetadata;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+/**
+ * Deserialization schema from Ogg JSON to Flink Table/SQL internal data structure {@link RowData}.
+ * The deserialization schema knows Ogg's schema definition and can extract the database data and
+ * convert into {@link RowData} with {@link RowKind}.
+ *
+ * <p>Deserializes a <code>byte[]</code> message as a JSON object and reads the specified fields.
+ *
+ * <p>Failures during deserialization are forwarded as wrapped IOExceptions.
+ *
+ * @see <a href="https://www.oracle.com/cn/middleware/technologies/goldengate/overview.html">Ogg</a>
+ */
+@Internal
+public final class OggJsonDeserializationSchema implements DeserializationSchema<RowData> {
+    private static final long serialVersionUID = 1L;
+
+    private static final String OP_CREATE = "I"; // insert
+    private static final String OP_UPDATE = "U"; // update
+    private static final String OP_DELETE = "D"; // delete
+    private static final String OP_TRUNCATE = "T"; // truncate
+
+    private static final String REPLICA_IDENTITY_EXCEPTION =
+            "The \"before\" field of %s message is null, "
+                    + "if you are using Ogg Postgres Connector, "
+                    + "please check the Postgres table has been set REPLICA IDENTITY to FULL level.";
+
+    /** The deserializer to deserialize Ogg JSON data. */
+    private final JsonRowDataDeserializationSchema jsonDeserializer;
+
+    /** Flag that indicates that an additional projection is required for metadata. */
+    private final boolean hasMetadata;
+
+    /** Metadata to be extracted for every record. */
+    private final MetadataConverter[] metadataConverters;
+
+    /** {@link TypeInformation} of the produced {@link RowData} (physical + metadata). */
+    private final TypeInformation<RowData> producedTypeInfo;
+
+    /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
+    private final boolean ignoreParseErrors;
+
+    public OggJsonDeserializationSchema(
+            DataType physicalDataType,
+            List<ReadableMetadata> requestedMetadata,
+            TypeInformation<RowData> producedTypeInfo,
+            boolean ignoreParseErrors,
+            TimestampFormat timestampFormat) {
+        final RowType jsonRowType = createJsonRowType(physicalDataType, requestedMetadata);
+        this.jsonDeserializer =
+                new JsonRowDataDeserializationSchema(
+                        jsonRowType,
+                        // the result type is never used, so it's fine to pass in the produced type
+                        // info
+                        producedTypeInfo,
+                        false, // ignoreParseErrors already contains the functionality of
+                        // failOnMissingField
+                        ignoreParseErrors,
+                        timestampFormat);
+        this.hasMetadata = requestedMetadata.size() > 0;
+        this.metadataConverters = createMetadataConverters(jsonRowType, requestedMetadata);
+        this.producedTypeInfo = producedTypeInfo;
+        this.ignoreParseErrors = ignoreParseErrors;
+    }
+
+    private static RowType createJsonRowType(
+            DataType physicalDataType, List<ReadableMetadata> readableMetadata) {
+        DataType payload =
+                DataTypes.ROW(
+                        DataTypes.FIELD("before", physicalDataType),
+                        DataTypes.FIELD("after", physicalDataType),
+                        DataTypes.FIELD("op_type", DataTypes.STRING()));
+
+        // append fields that are required for reading metadata in the payload
+        final List<DataTypes.Field> payloadMetadataFields =
+                readableMetadata.stream()
+                        .filter(m -> m.isJsonPayload)
+                        .map(m -> m.requiredJsonField)
+                        .distinct()
+                        .collect(Collectors.toList());
+        payload = DataTypeUtils.appendRowFields(payload, payloadMetadataFields);
+
+        DataType root = payload;
+
+        // append fields that are required for reading metadata in the root
+        final List<DataTypes.Field> rootMetadataFields =
+                readableMetadata.stream()
+                        .filter(m -> !m.isJsonPayload)
+                        .map(m -> m.requiredJsonField)
+                        .distinct()
+                        .collect(Collectors.toList());
+        root = DataTypeUtils.appendRowFields(root, rootMetadataFields);
+
+        return (RowType) root.getLogicalType();
+    }
+
+    private static MetadataConverter[] createMetadataConverters(
+            RowType jsonRowType, List<ReadableMetadata> requestedMetadata) {
+        return requestedMetadata.stream()
+                .map(
+                        m -> {
+                            if (m.isJsonPayload) {
+                                return convertInPayload(jsonRowType, m);
+                            } else {
+                                return convertInRoot(jsonRowType, m);
+                            }
+                        })
+                .toArray(MetadataConverter[]::new);
+    }
+
+    private static MetadataConverter convertInRoot(RowType jsonRowType, ReadableMetadata metadata) {
+        final int pos = findFieldPos(metadata, jsonRowType);
+        return new MetadataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(GenericRowData root, int unused) {
+                return metadata.converter.convert(root, pos);
+            }
+        };
+    }
+
+    private static MetadataConverter convertInPayload(
+            RowType jsonRowType, ReadableMetadata metadata) {
+        return convertInRoot(jsonRowType, metadata);
+    }
+
+    private static int findFieldPos(ReadableMetadata metadata, RowType jsonRowType) {
+        return jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName());
+    }
+
+    @Override
+    public RowData deserialize(byte[] message) {
+        throw new RuntimeException(
+                "Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
+        if (message == null || message.length == 0) {
+            // skip tombstone messages
+            return;
+        }
+        try {
+            GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message);
+
+            GenericRowData before = (GenericRowData) row.getField(0);
+            GenericRowData after = (GenericRowData) row.getField(1);
+            String op = row.getField(2).toString();
+            if (OP_CREATE.equals(op)) {
+                after.setRowKind(RowKind.INSERT);
+                emitRow(row, after, out);
+            } else if (OP_UPDATE.equals(op)) {
+                if (before == null) {
+                    throw new IllegalStateException(
+                            String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE"));
+                }
+                before.setRowKind(RowKind.UPDATE_BEFORE);
+                after.setRowKind(RowKind.UPDATE_AFTER);
+                emitRow(row, before, out);
+                emitRow(row, after, out);
+            } else if (OP_DELETE.equals(op)) {
+                if (before == null) {
+                    throw new IllegalStateException(
+                            String.format(REPLICA_IDENTITY_EXCEPTION, "DELETE"));
+                }
+                before.setRowKind(RowKind.DELETE);
+                emitRow(row, before, out);
+            } else {
+                if (!ignoreParseErrors) {
+                    throw new IOException(
+                            format(
+                                    "Unknown \"op_type\" value \"%s\". The Ogg JSON message is '%s'",
+                                    op, new String(message)));
+                }
+            }
+        } catch (Throwable t) {
+            // a big try catch to protect the processing.
+            if (!ignoreParseErrors) {
+                throw new IOException(
+                        format("Corrupt Ogg JSON message '%s'.", new String(message)), t);
+            }
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    private void emitRow(
+            GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
+        // shortcut in case no output projection is required
+        if (!hasMetadata) {
+            out.collect(physicalRow);
+            return;
+        }
+
+        final int physicalArity = physicalRow.getArity();
+        final int metadataArity = metadataConverters.length;
+
+        final GenericRowData producedRow =
+                new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity);
+
+        for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
+            producedRow.setField(physicalPos, physicalRow.getField(physicalPos));
+        }
+
+        for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
+            producedRow.setField(
+                    physicalArity + metadataPos, metadataConverters[metadataPos].convert(rootRow));
+        }
+
+        out.collect(producedRow);
+    }
+
+    @Override
+    public boolean isEndOfStream(RowData nextElement) {
+        return false;
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return producedTypeInfo;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        OggJsonDeserializationSchema that = (OggJsonDeserializationSchema) o;
+        return Objects.equals(jsonDeserializer, that.jsonDeserializer)
+                && hasMetadata == that.hasMetadata
+                && Objects.equals(producedTypeInfo, that.producedTypeInfo)
+                && ignoreParseErrors == that.ignoreParseErrors;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(jsonDeserializer, hasMetadata, producedTypeInfo, ignoreParseErrors);
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    /**
+     * Converter that extracts a metadata field from the row (root or payload) that comes out of the
+     * JSON schema and converts it to the desired data type.
+     */
+    interface MetadataConverter extends Serializable {
+
+        // Method for top-level access.
+        default Object convert(GenericRowData row) {
+            return convert(row, -1);
+        }
+
+        Object convert(GenericRowData row, int pos);
+    }
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java
new file mode 100644
index 0000000..f853983
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.ogg;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonFormatOptions;
+import org.apache.flink.formats.json.JsonFormatOptionsUtil;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
+import static org.apache.flink.formats.json.ogg.OggJsonFormatOptions.IGNORE_PARSE_ERRORS;
+import static org.apache.flink.formats.json.ogg.OggJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
+import static org.apache.flink.formats.json.ogg.OggJsonFormatOptions.JSON_MAP_NULL_KEY_MODE;
+import static org.apache.flink.formats.json.ogg.OggJsonFormatOptions.TIMESTAMP_FORMAT;
+
+/**
+ * Format factory for providing configured instances of Ogg JSON to RowData {@link
+ * DeserializationSchema}.
+ */
+@Internal
+public class OggJsonFormatFactory
+        implements DeserializationFormatFactory, SerializationFormatFactory {
+
+    public static final String IDENTIFIER = "ogg-json";
+
+    /** Validator for ogg decoding format. */
+    private static void validateDecodingFormatOptions(ReadableConfig tableOptions) {
+        JsonFormatOptionsUtil.validateDecodingFormatOptions(tableOptions);
+    }
+
+    /** Validator for ogg encoding format. */
+    private static void validateEncodingFormatOptions(ReadableConfig tableOptions) {
+        JsonFormatOptionsUtil.validateEncodingFormatOptions(tableOptions);
+    }
+
+    @Override
+    public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
+            DynamicTableFactory.Context context, ReadableConfig formatOptions) {
+
+        FactoryUtil.validateFactoryOptions(this, formatOptions);
+        validateDecodingFormatOptions(formatOptions);
+
+        final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
+
+        final TimestampFormat timestampFormat =
+                JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
+
+        return new OggJsonDecodingFormat(ignoreParseErrors, timestampFormat);
+    }
+
+    @Override
+    public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
+            DynamicTableFactory.Context context, ReadableConfig formatOptions) {
+
+        FactoryUtil.validateFactoryOptions(this, formatOptions);
+        validateEncodingFormatOptions(formatOptions);
+
+        TimestampFormat timestampFormat = JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
+        JsonFormatOptions.MapNullKeyMode mapNullKeyMode =
+                JsonFormatOptionsUtil.getMapNullKeyMode(formatOptions);
+        String mapNullKeyLiteral = formatOptions.get(JSON_MAP_NULL_KEY_LITERAL);
+
+        final boolean encodeDecimalAsPlainNumber =
+                formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+
+        return new EncodingFormat<SerializationSchema<RowData>>() {
+
+            @Override
+            public ChangelogMode getChangelogMode() {
+                return ChangelogMode.newBuilder()
+                        .addContainedKind(RowKind.INSERT)
+                        .addContainedKind(RowKind.UPDATE_BEFORE)
+                        .addContainedKind(RowKind.UPDATE_AFTER)
+                        .addContainedKind(RowKind.DELETE)
+                        .build();
+            }
+
+            @Override
+            public SerializationSchema<RowData> createRuntimeEncoder(
+                    DynamicTableSink.Context context, DataType consumedDataType) {
+                final RowType rowType = (RowType) consumedDataType.getLogicalType();
+                return new OggJsonSerializationSchema(
+                        rowType,
+                        timestampFormat,
+                        mapNullKeyMode,
+                        mapNullKeyLiteral,
+                        encodeDecimalAsPlainNumber);
+            }
+        };
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(IGNORE_PARSE_ERRORS);
+        options.add(TIMESTAMP_FORMAT);
+        options.add(JSON_MAP_NULL_KEY_MODE);
+        options.add(JSON_MAP_NULL_KEY_LITERAL);
+        options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+        return options;
+    }
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatOptions.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatOptions.java
new file mode 100644
index 0000000..feeca79
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.ogg;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.formats.json.JsonFormatOptions;
+
+/** Option utils for ogg-json format. */
+@PublicEvolving
+public class OggJsonFormatOptions {
+
+    public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS =
+            JsonFormatOptions.IGNORE_PARSE_ERRORS;
+
+    public static final ConfigOption<String> TIMESTAMP_FORMAT = JsonFormatOptions.TIMESTAMP_FORMAT;
+
+    public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE =
+            JsonFormatOptions.MAP_NULL_KEY_MODE;
+
+    public static final ConfigOption<String> JSON_MAP_NULL_KEY_LITERAL =
+            JsonFormatOptions.MAP_NULL_KEY_LITERAL;
+
+    private OggJsonFormatOptions() {}
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java
new file mode 100644
index 0000000..2189f86
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.ogg;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonFormatOptions;
+import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Objects;
+
+import static java.lang.String.format;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
+
+/**
+ * Serialization schema from Flink Table/SQL internal data structure {@link RowData} to Ogg JSON.
+ *
+ * @see <a
+ *     href="https://docs.oracle.com/goldengate/bd1221/gg-bd/GADBD/GUID-F0FA2781-0802-4530-B1F0-5E102B982EC0.htm#GADBD505">Ogg
+ *     JSON Message</a>
+ */
+public class OggJsonSerializationSchema implements SerializationSchema<RowData> {
+    private static final long serialVersionUID = 1L;
+
+    private static final StringData OP_INSERT = StringData.fromString("I"); // insert
+    private static final StringData OP_DELETE = StringData.fromString("D"); // delete
+
+    /** The serializer to serialize Ogg JSON data. * */
+    private final JsonRowDataSerializationSchema jsonSerializer;
+
+    private transient GenericRowData genericRowData;
+
+    public OggJsonSerializationSchema(
+            RowType rowType,
+            TimestampFormat timestampFormat,
+            JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
+            String mapNullKeyLiteral,
+            boolean encodeDecimalAsPlainNumber) {
+        jsonSerializer =
+                new JsonRowDataSerializationSchema(
+                        createJsonRowType(fromLogicalToDataType(rowType)),
+                        timestampFormat,
+                        mapNullKeyMode,
+                        mapNullKeyLiteral,
+                        encodeDecimalAsPlainNumber);
+    }
+
+    private static RowType createJsonRowType(DataType databaseSchema) {
+        // Ogg JSON contains some other information, e.g. "source", "ts_ms"
+        // but we don't need them.
+        return (RowType)
+                DataTypes.ROW(
+                                DataTypes.FIELD("before", databaseSchema),
+                                DataTypes.FIELD("after", databaseSchema),
+                                DataTypes.FIELD("op_type", DataTypes.STRING()))
+                        .getLogicalType();
+    }
+
+    @Override
+    public void open(InitializationContext context) {
+        genericRowData = new GenericRowData(3);
+    }
+
+    @Override
+    public byte[] serialize(RowData rowData) {
+        try {
+            switch (rowData.getRowKind()) {
+                case INSERT:
+                case UPDATE_AFTER:
+                    genericRowData.setField(0, null);
+                    genericRowData.setField(1, rowData);
+                    genericRowData.setField(2, OP_INSERT);
+                    return jsonSerializer.serialize(genericRowData);
+                case UPDATE_BEFORE:
+                case DELETE:
+                    genericRowData.setField(0, rowData);
+                    genericRowData.setField(1, null);
+                    genericRowData.setField(2, OP_DELETE);
+                    return jsonSerializer.serialize(genericRowData);
+                default:
+                    throw new UnsupportedOperationException(
+                            format(
+                                    "Unsupported operation '%s' for row kind.",
+                                    rowData.getRowKind()));
+            }
+        } catch (Throwable t) {
+            throw new RuntimeException(format("Could not serialize row '%s'.", rowData), t);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        OggJsonSerializationSchema that = (OggJsonSerializationSchema) o;
+        return Objects.equals(jsonSerializer, that.jsonSerializer);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(jsonSerializer);
+    }
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-formats-kafka/flink-json-debezium/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 08f4657..35e647d 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-formats-kafka/flink-json-debezium/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -17,3 +17,4 @@
 org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
 org.apache.flink.formats.json.canal.CanalJsonFormatFactory
 org.apache.flink.formats.json.maxwell.MaxwellJsonFormatFactory
+org.apache.flink.formats.json.ogg.OggJsonFormatFactory
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFileSystemITCase.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFileSystemITCase.java
new file mode 100644
index 0000000..eaac0d3
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFileSystemITCase.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.ogg;
+
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+/** Test Filesystem connector with DebeziumJson. */
+public class OggJsonFileSystemITCase extends StreamingTestBase {
+
+    private static final List<String> EXPECTED =
+            Arrays.asList(
+                    "+I[101, SCOOTER, Small 2-wheel scooter, 3.14]",
+                    "+I[102, CAR BATTERY, 12V car battery, 8.1]",
+                    "+I[103, 12-PACK DRILL BITS, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]",
+                    "+I[104, HAMMER, 12oz carpenter's hammer, 0.75]",
+                    "+I[105, HAMMER, 14oz carpenter's hammer, 0.875]",
+                    "+I[106, HAMMER, 16oz carpenter's hammer, 1.0]",
+                    "+I[107, ROCKS, box of assorted rocks, 5.3]",
+                    "+I[108, JACKET, water resistent black wind breaker, 0.1]",
+                    "+I[109, SPARE TIRE, 24 inch spare tire, 22.2]",
+                    "-D[106, HAMMER, 16oz carpenter's hammer, 1.0]", // -U
+                    "+I[106, HAMMER, 18oz carpenter hammer, 1.0]", // +U
+                    "-D[107, ROCKS, box of assorted rocks, 5.3]", // -U
+                    "+I[107, ROCKS, box of assorted rocks, 5.1]", // +U
+                    "+I[110, JACKET, water resistent white wind breaker, 0.2]",
+                    "+I[111, SCOOTER, Big 2-wheel scooter , 5.18]",
+                    "-D[110, JACKET, water resistent white wind breaker, 0.2]", // -U
+                    "+I[110, JACKET, new water resistent white wind breaker, 0.5]", // +U
+                    "-D[111, SCOOTER, Big 2-wheel scooter , 5.18]", // -U
+                    "+I[111, SCOOTER, Big 2-wheel scooter , 5.17]", // +U
+                    "-D[111, SCOOTER, Big 2-wheel scooter , 5.17]");
+
+    private File source;
+    private File sink;
+
+    private static byte[] readBytes(String resource) throws IOException {
+        final URL url = OggJsonSerDeSchemaTest.class.getClassLoader().getResource(resource);
+        assert url != null;
+        Path path = new File(url.getFile()).toPath();
+        return Files.readAllBytes(path);
+    }
+
+    private void prepareTables(boolean isPartition) throws IOException {
+        byte[] bytes = readBytes("ogg-data.txt");
+        source = TEMPORARY_FOLDER.newFolder();
+        File file;
+        if (isPartition) {
+            File partition = new File(source, "p=1");
+            partition.mkdirs();
+            file = new File(partition, "my_file");
+        } else {
+            file = new File(source, "my_file");
+        }
+        file.createNewFile();
+        Files.write(file.toPath(), bytes);
+
+        sink = TEMPORARY_FOLDER.newFolder();
+
+        env().setParallelism(1);
+    }
+
+    private void createTable(boolean isSink, String path, boolean isPartition) {
+        tEnv().executeSql(
+                        format("create table %s (", isSink ? "sink" : "source")
+                                + "id int, name string,"
+                                + (isSink ? "upper_name string," : "")
+                                + " description string, weight float"
+                                + (isPartition ? ", p int) partitioned by (p) " : ")")
+                                + " with ("
+                                + "'connector'='filesystem',"
+                                + "'format'='debezium-json',"
+                                + format("'path'='%s'", path)
+                                + ")");
+    }
+
+    @Test
+    public void testNonPartition() throws Exception {
+        prepareTables(false);
+        createTable(false, source.toURI().toString(), false);
+        createTable(true, sink.toURI().toString(), false);
+
+        tEnv().executeSql(
+                        "insert into sink select id,name,UPPER(name),description,weight from source")
+                .await();
+        CloseableIterator<Row> iter =
+                tEnv().executeSql("select id,upper_name,description,weight from sink").collect();
+
+        List<String> results =
+                CollectionUtil.iteratorToList(iter).stream()
+                        .map(Row::toString)
+                        .collect(Collectors.toList());
+        iter.close();
+
+        Assert.assertEquals(EXPECTED, results);
+    }
+
+    @Test
+    public void testPartition() throws Exception {
+        prepareTables(true);
+        createTable(false, source.toURI().toString(), true);
+        createTable(true, sink.toURI().toString(), true);
+
+        tEnv().executeSql(
+                        "insert into sink select id,name,UPPER(name),description,weight,p from source")
+                .await();
+        CloseableIterator<Row> iter =
+                tEnv().executeSql("select id,upper_name,description,weight,p from sink").collect();
+        List<Row> list = CollectionUtil.iteratorToList(iter);
+        iter.close();
+
+        List<String> results =
+                list.stream()
+                        .map(row -> Row.project(row, new int[] {0, 1, 2, 3}))
+                        .map(Row::toString)
+                        .collect(Collectors.toList());
+
+        Assert.assertEquals(EXPECTED, results);
+
+        // check partition value
+        for (Row row : list) {
+            Assert.assertEquals(1, row.getField(4));
+        }
+    }
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java
new file mode 100644
index 0000000..fd669a1
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.ogg;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonFormatOptions;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TestDynamicTableFactory;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_DATA_TYPE;
+import static org.apache.flink.table.factories.utils.FactoryMocks.SCHEMA;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.junit.Assert.assertEquals;
+
+/** Tests for {@link OggJsonFormatFactory}. */
+public class OggJsonFormatFactoryTest extends TestLogger {
+    @Rule public ExpectedException thrown = ExpectedException.none();
+
+    @Test
+    public void testSeDeSchema() {
+        final Map<String, String> options = getAllOptions();
+
+        final OggJsonSerializationSchema expectedSer =
+                new OggJsonSerializationSchema(
+                        (RowType) PHYSICAL_DATA_TYPE.getLogicalType(),
+                        TimestampFormat.ISO_8601,
+                        JsonFormatOptions.MapNullKeyMode.LITERAL,
+                        "null",
+                        true);
+
+        final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
+        assert actualSink instanceof TestDynamicTableFactory.DynamicTableSinkMock;
+        TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+                (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
+
+        SerializationSchema<RowData> actualSer =
+                sinkMock.valueFormat.createRuntimeEncoder(
+                        new SinkRuntimeProviderContext(false), PHYSICAL_DATA_TYPE);
+
+        assertEquals(expectedSer, actualSer);
+    }
+
+    @Test
+    public void testInvalidIgnoreParseError() {
+        thrown.expect(
+                containsCause(
+                        new IllegalArgumentException(
+                                "Unrecognized option for boolean: abc. Expected either true or false(case insensitive)")));
+
+        final Map<String, String> options =
+                getModifiedOptions(opts -> opts.put("ogg-json.ignore-parse-errors", "abc"));
+
+        createTableSource(SCHEMA, options);
+    }
+
+    @Test
+    public void testInvalidOptionForTimestampFormat() {
+        final Map<String, String> tableOptions =
+                getModifiedOptions(opts -> opts.put("ogg-json.timestamp-format.standard", "test"));
+
+        thrown.expect(ValidationException.class);
+        thrown.expect(
+                containsCause(
+                        new ValidationException(
+                                "Unsupported value 'test' for timestamp-format.standard. Supported values are [SQL, ISO-8601].")));
+        createTableSource(SCHEMA, tableOptions);
+    }
+
+    @Test
+    public void testInvalidOptionForMapNullKeyMode() {
+        final Map<String, String> tableOptions =
+                getModifiedOptions(opts -> opts.put("ogg-json.map-null-key.mode", "invalid"));
+
+        thrown.expect(ValidationException.class);
+        thrown.expect(
+                containsCause(
+                        new ValidationException(
+                                "Unsupported value 'invalid' for option map-null-key.mode. Supported values are [LITERAL, FAIL, DROP].")));
+        createTableSink(SCHEMA, tableOptions);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Utilities
+    // ------------------------------------------------------------------------
+
+    /**
+     * Returns the full options modified by the given consumer {@code optionModifier}.
+     *
+     * @param optionModifier Consumer to modify the options
+     */
+    private Map<String, String> getModifiedOptions(Consumer<Map<String, String>> optionModifier) {
+        Map<String, String> options = getAllOptions();
+        optionModifier.accept(options);
+        return options;
+    }
+
+    private Map<String, String> getAllOptions() {
+        final Map<String, String> options = new HashMap<>();
+        options.put("connector", TestDynamicTableFactory.IDENTIFIER);
+        options.put("target", "MyTarget");
+        options.put("buffer-size", "1000");
+
+        options.put("format", "ogg-json");
+        options.put("ogg-json.ignore-parse-errors", "true");
+        options.put("ogg-json.timestamp-format.standard", "ISO-8601");
+        options.put("ogg-json.map-null-key.mode", "LITERAL");
+        options.put("ogg-json.map-null-key.literal", "null");
+        options.put("ogg-json.encode.decimal-as-plain-number", "true");
+        return options;
+    }
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
new file mode 100644
index 0000000..dc99f58
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.ogg;
+
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonFormatOptions;
+import org.apache.flink.formats.json.ogg.OggJsonDecodingFormat.ReadableMetadata;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.util.Collector;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link OggJsonSerializationSchema} and {@link OggJsonDeserializationSchema}. */
+public class OggJsonSerDeSchemaTest {
+
+    private static final DataType PHYSICAL_DATA_TYPE =
+            ROW(
+                    FIELD("id", INT().notNull()),
+                    FIELD("name", STRING()),
+                    FIELD("description", STRING()),
+                    FIELD("weight", FLOAT()));
+    @Rule public ExpectedException thrown = ExpectedException.none();
+
+    private static List<String> readLines(String resource) throws IOException {
+        final URL url = OggJsonSerDeSchemaTest.class.getClassLoader().getResource(resource);
+        assert url != null;
+        Path path = new File(url.getFile()).toPath();
+        return Files.readAllLines(path);
+    }
+
+    @Test
+    public void testSerializationAndDeserialization() throws Exception {
+        testSerializationDeserialization("ogg-data.txt");
+    }
+
+    @Test
+    public void testTombstoneMessages() throws Exception {
+        OggJsonDeserializationSchema deserializationSchema =
+                new OggJsonDeserializationSchema(
+                        PHYSICAL_DATA_TYPE,
+                        Collections.emptyList(),
+                        InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()),
+                        false,
+                        TimestampFormat.ISO_8601);
+        SimpleCollector collector = new SimpleCollector();
+        deserializationSchema.deserialize(null, collector);
+        deserializationSchema.deserialize(new byte[] {}, collector);
+        assertTrue(collector.list.isEmpty());
+    }
+
+    private void testSerializationDeserialization(String resourceFile) throws Exception {
+        List<String> lines = readLines(resourceFile);
+        OggJsonDeserializationSchema deserializationSchema =
+                new OggJsonDeserializationSchema(
+                        PHYSICAL_DATA_TYPE,
+                        Collections.emptyList(),
+                        InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()),
+                        false,
+                        TimestampFormat.ISO_8601);
+
+        SimpleCollector collector = new SimpleCollector();
+        for (String line : lines) {
+            deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8), collector);
+        }
+
+        // Ogg captures change data (`ogg-data.txt`) on the `product`
+        // table:
+        //
+        // CREATE TABLE product (
+        //  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+        //  name VARCHAR(255),
+        //  description VARCHAR(512),
+        //  weight FLOAT
+        // );
+        // ALTER TABLE product AUTO_INCREMENT = 101;
+        //
+        // INSERT INTO product
+        // VALUES (default,"scooter","Small 2-wheel scooter",3.14),
+        //        (default,"car battery","12V car battery",8.1),
+        //        (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40
+        // to #3",0.8),
+        //        (default,"hammer","12oz carpenter's hammer",0.75),
+        //        (default,"hammer","14oz carpenter's hammer",0.875),
+        //        (default,"hammer","16oz carpenter's hammer",1.0),
+        //        (default,"rocks","box of assorted rocks",5.3),
+        //        (default,"jacket","water resistent black wind breaker",0.1),
+        //        (default,"spare tire","24 inch spare tire",22.2);
+        // UPDATE product SET description='18oz carpenter hammer' WHERE id=106;
+        // UPDATE product SET weight='5.1' WHERE id=107;
+        // INSERT INTO product VALUES (default,"jacket","water resistent white wind breaker",0.2);
+        // INSERT INTO product VALUES (default,"scooter","Big 2-wheel scooter ",5.18);
+        // UPDATE product SET description='new water resistent white wind breaker', weight='0.5'
+        // WHERE id=110;
+        // UPDATE product SET weight='5.17' WHERE id=111;
+        // DELETE FROM product WHERE id=111;
+        List<String> expected =
+                Arrays.asList(
+                        "+I(101,scooter,Small 2-wheel scooter,3.14)",
+                        "+I(102,car battery,12V car battery,8.1)",
+                        "+I(103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8)",
+                        "+I(104,hammer,12oz carpenter's hammer,0.75)",
+                        "+I(105,hammer,14oz carpenter's hammer,0.875)",
+                        "+I(106,hammer,16oz carpenter's hammer,1.0)",
+                        "+I(107,rocks,box of assorted rocks,5.3)",
+                        "+I(108,jacket,water resistent black wind breaker,0.1)",
+                        "+I(109,spare tire,24 inch spare tire,22.2)",
+                        "-U(106,hammer,16oz carpenter's hammer,1.0)",
+                        "+U(106,hammer,18oz carpenter hammer,1.0)",
+                        "-U(107,rocks,box of assorted rocks,5.3)",
+                        "+U(107,rocks,box of assorted rocks,5.1)",
+                        "+I(110,jacket,water resistent white wind breaker,0.2)",
+                        "+I(111,scooter,Big 2-wheel scooter ,5.18)",
+                        "-U(110,jacket,water resistent white wind breaker,0.2)",
+                        "+U(110,jacket,new water resistent white wind breaker,0.5)",
+                        "-U(111,scooter,Big 2-wheel scooter ,5.18)",
+                        "+U(111,scooter,Big 2-wheel scooter ,5.17)",
+                        "-D(111,scooter,Big 2-wheel scooter ,5.17)");
+        List<String> actual =
+                collector.list.stream().map(Object::toString).collect(Collectors.toList());
+        assertEquals(expected, actual);
+
+        OggJsonSerializationSchema serializationSchema =
+                new OggJsonSerializationSchema(
+                        (RowType) PHYSICAL_DATA_TYPE.getLogicalType(),
+                        TimestampFormat.SQL,
+                        JsonFormatOptions.MapNullKeyMode.LITERAL,
+                        "null",
+                        true);
+
+        serializationSchema.open(null);
+        actual = new ArrayList<>();
+        for (RowData rowData : collector.list) {
+            actual.add(new String(serializationSchema.serialize(rowData), StandardCharsets.UTF_8));
+        }
+
+        expected =
+                Arrays.asList(
+                        "{\"before\":null,\"after\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":3.14},\"op_type\":\"I\"}",
+                        "{\"before\":null,\"after\":{\"id\":102,\"name\":\"car battery\",\"description\":\"12V car battery\",\"weight\":8.1},\"op_type\":\"I\"}",
+                        "{\"before\":null,\"after\":{\"id\":103,\"name\":\"12-pack drill bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\"weight\":0.8},\"op_type\":\"I\"}",
+                        "{\"before\":null,\"after\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's hammer\",\"weight\":0.75},\"op_type\":\"I\"}",
+                        "{\"before\":null,\"after\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's hammer\",\"weight\":0.875},\"op_type\":\"I\"}",
+                        "{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0},\"op_type\":\"I\"}",
+                        "{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3},\"op_type\":\"I\"}",
+                        "{\"before\":null,\"after\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent black wind breaker\",\"weight\":0.1},\"op_type\":\"I\"}",
+                        "{\"before\":null,\"after\":{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":22.2},\"op_type\":\"I\"}",
+                        "{\"before\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0},\"after\":null,\"op_type\":\"D\"}",
+                        "{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter hammer\",\"weight\":1.0},\"op_type\":\"I\"}",
+                        "{\"before\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3},\"after\":null,\"op_type\":\"D\"}",
+                        "{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.1},\"op_type\":\"I\"}",
+                        "{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent white wind breaker\",\"weight\":0.2},\"op_type\":\"I\"}",
+                        "{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.18},\"op_type\":\"I\"}",
+                        "{\"before\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent white wind breaker\",\"weight\":0.2},\"after\":null,\"op_type\":\"D\"}",
+                        "{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water resistent white wind breaker\",\"weight\":0.5},\"op_type\":\"I\"}",
+                        "{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.18},\"after\":null,\"op_type\":\"D\"}",
+                        "{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"op_type\":\"I\"}",
+                        "{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"after\":null,\"op_type\":\"D\"}");
+        assertEquals(expected, actual);
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Utilities
+    // --------------------------------------------------------------------------------------------
+
+    private void testDeserializationWithMetadata(
+            String resourceFile, Consumer<RowData> testConsumer) throws Exception {
+        // we only read the first line for keeping the test simple
+        final String firstLine = readLines(resourceFile).get(0);
+
+        final List<ReadableMetadata> requestedMetadata = Arrays.asList(ReadableMetadata.values());
+
+        final DataType producedDataType =
+                DataTypeUtils.appendRowFields(
+                        PHYSICAL_DATA_TYPE,
+                        requestedMetadata.stream()
+                                .map(m -> DataTypes.FIELD(m.key, m.dataType))
+                                .collect(Collectors.toList()));
+
+        final OggJsonDeserializationSchema deserializationSchema =
+                new OggJsonDeserializationSchema(
+                        PHYSICAL_DATA_TYPE,
+                        requestedMetadata,
+                        InternalTypeInfo.of(producedDataType.getLogicalType()),
+                        false,
+                        TimestampFormat.ISO_8601);
+
+        final SimpleCollector collector = new SimpleCollector();
+        deserializationSchema.deserialize(firstLine.getBytes(StandardCharsets.UTF_8), collector);
+
+        assertEquals(1, collector.list.size());
+        testConsumer.accept(collector.list.get(0));
+    }
+
+    private static class SimpleCollector implements Collector<RowData> {
+
+        private final List<RowData> list = new ArrayList<>();
+
+        @Override
+        public void collect(RowData record) {
+            list.add(record);
+        }
+
+        @Override
+        public void close() {
+            // do nothing
+        }
+    }
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/ogg-data.txt b/flink-formats-kafka/flink-json-debezium/src/test/resources/ogg-data.txt
new file mode 100644
index 0000000..04b4953
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/test/resources/ogg-data.txt
@@ -0,0 +1,16 @@
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000143","priamry_keys":["id"],"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"op_type":"I","op_ts":"2020-05-13 15:40:06.000000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000144","priamry_keys":["id"],"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000145","priamry_keys":["id"],"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.800000011920929},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000146","priamry_keys":["id"],"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000147","priamry_keys":["id"],"after":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000148","priamry_keys":["id"],"after":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000149","priamry_keys":["id"],"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000150","priamry_keys":["id"],"after":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.10000000149011612},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000151","priamry_keys":["id"],"after":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000152","priamry_keys":["id"],"before":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"after":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1},"op_type":"U","op_ts":"2020-05-13 17:26:27.936000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000153","priamry_keys":["id"],"before":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.099999904632568},"op_type":"U","op_ts":"2020-05-13 17:28:19.505000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000154","priamry_keys":["id"],"after":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","op_ts":1589362210000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1068,"row":0,"thread":2,"query":null},"op_type":"I","op_ts":"2020-05-13 17:30:10.230000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000155","priamry_keys":["id"],"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"op_type":"I","op_ts":"2020-05-13 17:30:43.428000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000156","priamry_keys":["id"],"before":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"after":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"op_type":"U","op_ts":"2020-05-13 17:32:20.327000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000157","priamry_keys":["id"],"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"op_type":"U","op_ts":"2020-05-13 17:32:10.904000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000158","priamry_keys":["id"],"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"after":null,"op_type":"D","op_ts":"2020-05-13 17:32:24.455000"}