[FLINK-24745][format][json] Support Oracle OGG json format
This closes #17657.
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDecodingFormat.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDecodingFormat.java
new file mode 100644
index 0000000..0329129
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDecodingFormat.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.ogg;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.ogg.OggJsonDeserializationSchema.MetadataConverter;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** {@link DecodingFormat} for Ogg using JSON encoding. */
+public class OggJsonDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {
+
+ // --------------------------------------------------------------------------------------------
+ // Mutable attributes
+ // --------------------------------------------------------------------------------------------
+
+ private static final StringData KEY_SOURCE_TIMESTAMP = StringData.fromString("op_ts");
+
+ // --------------------------------------------------------------------------------------------
+ // Ogg-specific attributes
+ // --------------------------------------------------------------------------------------------
+ private static final StringData KEY_SOURCE_TABLE = StringData.fromString("table");
+ private final boolean ignoreParseErrors;
+ private final TimestampFormat timestampFormat;
+ private List<String> metadataKeys;
+
+ public OggJsonDecodingFormat(boolean ignoreParseErrors, TimestampFormat timestampFormat) {
+ this.ignoreParseErrors = ignoreParseErrors;
+ this.timestampFormat = timestampFormat;
+ this.metadataKeys = Collections.emptyList();
+ }
+
+ private static Object readProperty(GenericRowData row, int pos, StringData key) {
+ final GenericMapData map = (GenericMapData) row.getMap(pos);
+ if (map == null) {
+ return null;
+ }
+ return map.get(key);
+ }
+
+ @Override
+ public DeserializationSchema<RowData> createRuntimeDecoder(
+ DynamicTableSource.Context context, DataType physicalDataType) {
+
+ final List<ReadableMetadata> readableMetadata =
+ metadataKeys.stream()
+ .map(
+ k ->
+ Stream.of(ReadableMetadata.values())
+ .filter(rm -> rm.key.equals(k))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new))
+ .collect(Collectors.toList());
+
+ final List<DataTypes.Field> metadataFields =
+ readableMetadata.stream()
+ .map(m -> DataTypes.FIELD(m.key, m.dataType))
+ .collect(Collectors.toList());
+
+ final DataType producedDataType =
+ DataTypeUtils.appendRowFields(physicalDataType, metadataFields);
+
+ final TypeInformation<RowData> producedTypeInfo =
+ context.createTypeInformation(producedDataType);
+
+ return new OggJsonDeserializationSchema(
+ physicalDataType,
+ readableMetadata,
+ producedTypeInfo,
+ ignoreParseErrors,
+ timestampFormat);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Metadata handling
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+ Stream.of(ReadableMetadata.values())
+ .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
+ return metadataMap;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys) {
+ this.metadataKeys = metadataKeys;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_BEFORE)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+ }
+
+ /** List of metadata that can be read with this format. */
+ enum ReadableMetadata {
+ INGESTION_TIMESTAMP(
+ "current_ts",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+ true,
+ DataTypes.FIELD("current_ts", DataTypes.BIGINT()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
+ return TimestampData.fromEpochMillis(row.getLong(pos));
+ }
+ }),
+
+ SOURCE_TIMESTAMP(
+ "op_ts",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+ true,
+ DataTypes.FIELD("op_ts", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ final StringData timestamp =
+ (StringData) readProperty(row, pos, KEY_SOURCE_TIMESTAMP);
+ if (timestamp == null) {
+ return null;
+ }
+ return TimestampData.fromEpochMillis(Long.parseLong(timestamp.toString()));
+ }
+ }),
+
+ SOURCE_TABLE(
+ "table",
+ DataTypes.STRING().nullable(),
+ true,
+ DataTypes.FIELD("table", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ return readProperty(row, pos, KEY_SOURCE_TABLE);
+ }
+ }),
+
+ SOURCE_PROPERTIES(
+ "source.properties",
+ // key and value of the map are nullable to make handling easier in queries
+ DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable())
+ .nullable(),
+ true,
+ DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ return row.getMap(pos);
+ }
+ });
+
+ final String key;
+
+ final DataType dataType;
+
+ final boolean isJsonPayload;
+
+ final DataTypes.Field requiredJsonField;
+
+ final MetadataConverter converter;
+
+ ReadableMetadata(
+ String key,
+ DataType dataType,
+ boolean isJsonPayload,
+ DataTypes.Field requiredJsonField,
+ MetadataConverter converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.isJsonPayload = isJsonPayload;
+ this.requiredJsonField = requiredJsonField;
+ this.converter = converter;
+ }
+ }
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java
new file mode 100644
index 0000000..4a018d5
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.ogg;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import org.apache.flink.formats.json.ogg.OggJsonDecodingFormat.ReadableMetadata;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+/**
+ * Deserialization schema from Ogg JSON to Flink Table/SQL internal data structure {@link RowData}.
+ * The deserialization schema knows Ogg's schema definition and can extract the database data and
+ * convert into {@link RowData} with {@link RowKind}.
+ *
+ * <p>Deserializes a <code>byte[]</code> message as a JSON object and reads the specified fields.
+ *
+ * <p>Failures during deserialization are forwarded as wrapped IOExceptions.
+ *
+ * @see <a href="https://www.oracle.com/cn/middleware/technologies/goldengate/overview.html">Ogg</a>
+ */
+@Internal
+public final class OggJsonDeserializationSchema implements DeserializationSchema<RowData> {
+ private static final long serialVersionUID = 1L;
+
+ private static final String OP_CREATE = "I"; // insert
+ private static final String OP_UPDATE = "U"; // update
+ private static final String OP_DELETE = "D"; // delete
+ private static final String OP_TRUNCATE = "T"; // truncate
+
+ private static final String REPLICA_IDENTITY_EXCEPTION =
+ "The \"before\" field of %s message is null, "
+ + "if you are using Ogg Postgres Connector, "
+ + "please check the Postgres table has been set REPLICA IDENTITY to FULL level.";
+
+ /** The deserializer to deserialize Ogg JSON data. */
+ private final JsonRowDataDeserializationSchema jsonDeserializer;
+
+ /** Flag that indicates that an additional projection is required for metadata. */
+ private final boolean hasMetadata;
+
+ /** Metadata to be extracted for every record. */
+ private final MetadataConverter[] metadataConverters;
+
+ /** {@link TypeInformation} of the produced {@link RowData} (physical + metadata). */
+ private final TypeInformation<RowData> producedTypeInfo;
+
+ /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
+ private final boolean ignoreParseErrors;
+
+ public OggJsonDeserializationSchema(
+ DataType physicalDataType,
+ List<ReadableMetadata> requestedMetadata,
+ TypeInformation<RowData> producedTypeInfo,
+ boolean ignoreParseErrors,
+ TimestampFormat timestampFormat) {
+ final RowType jsonRowType = createJsonRowType(physicalDataType, requestedMetadata);
+ this.jsonDeserializer =
+ new JsonRowDataDeserializationSchema(
+ jsonRowType,
+ // the result type is never used, so it's fine to pass in the produced type
+ // info
+ producedTypeInfo,
+ false, // ignoreParseErrors already contains the functionality of
+ // failOnMissingField
+ ignoreParseErrors,
+ timestampFormat);
+ this.hasMetadata = requestedMetadata.size() > 0;
+ this.metadataConverters = createMetadataConverters(jsonRowType, requestedMetadata);
+ this.producedTypeInfo = producedTypeInfo;
+ this.ignoreParseErrors = ignoreParseErrors;
+ }
+
+ private static RowType createJsonRowType(
+ DataType physicalDataType, List<ReadableMetadata> readableMetadata) {
+ DataType payload =
+ DataTypes.ROW(
+ DataTypes.FIELD("before", physicalDataType),
+ DataTypes.FIELD("after", physicalDataType),
+ DataTypes.FIELD("op_type", DataTypes.STRING()));
+
+ // append fields that are required for reading metadata in the payload
+ final List<DataTypes.Field> payloadMetadataFields =
+ readableMetadata.stream()
+ .filter(m -> m.isJsonPayload)
+ .map(m -> m.requiredJsonField)
+ .distinct()
+ .collect(Collectors.toList());
+ payload = DataTypeUtils.appendRowFields(payload, payloadMetadataFields);
+
+ DataType root = payload;
+
+ // append fields that are required for reading metadata in the root
+ final List<DataTypes.Field> rootMetadataFields =
+ readableMetadata.stream()
+ .filter(m -> !m.isJsonPayload)
+ .map(m -> m.requiredJsonField)
+ .distinct()
+ .collect(Collectors.toList());
+ root = DataTypeUtils.appendRowFields(root, rootMetadataFields);
+
+ return (RowType) root.getLogicalType();
+ }
+
+ private static MetadataConverter[] createMetadataConverters(
+ RowType jsonRowType, List<ReadableMetadata> requestedMetadata) {
+ return requestedMetadata.stream()
+ .map(
+ m -> {
+ if (m.isJsonPayload) {
+ return convertInPayload(jsonRowType, m);
+ } else {
+ return convertInRoot(jsonRowType, m);
+ }
+ })
+ .toArray(MetadataConverter[]::new);
+ }
+
+ private static MetadataConverter convertInRoot(RowType jsonRowType, ReadableMetadata metadata) {
+ final int pos = findFieldPos(metadata, jsonRowType);
+ return new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData root, int unused) {
+ return metadata.converter.convert(root, pos);
+ }
+ };
+ }
+
+ private static MetadataConverter convertInPayload(
+ RowType jsonRowType, ReadableMetadata metadata) {
+ return convertInRoot(jsonRowType, metadata);
+ }
+
+ private static int findFieldPos(ReadableMetadata metadata, RowType jsonRowType) {
+ return jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName());
+ }
+
+ @Override
+ public RowData deserialize(byte[] message) {
+ throw new RuntimeException(
+ "Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
+ }
+
+ @Override
+ public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
+ if (message == null || message.length == 0) {
+ // skip tombstone messages
+ return;
+ }
+ try {
+ GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message);
+
+ GenericRowData before = (GenericRowData) row.getField(0);
+ GenericRowData after = (GenericRowData) row.getField(1);
+ String op = row.getField(2).toString();
+ if (OP_CREATE.equals(op)) {
+ after.setRowKind(RowKind.INSERT);
+ emitRow(row, after, out);
+ } else if (OP_UPDATE.equals(op)) {
+ if (before == null) {
+ throw new IllegalStateException(
+ String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE"));
+ }
+ before.setRowKind(RowKind.UPDATE_BEFORE);
+ after.setRowKind(RowKind.UPDATE_AFTER);
+ emitRow(row, before, out);
+ emitRow(row, after, out);
+ } else if (OP_DELETE.equals(op)) {
+ if (before == null) {
+ throw new IllegalStateException(
+ String.format(REPLICA_IDENTITY_EXCEPTION, "DELETE"));
+ }
+ before.setRowKind(RowKind.DELETE);
+ emitRow(row, before, out);
+ } else {
+ if (!ignoreParseErrors) {
+ throw new IOException(
+ format(
+ "Unknown \"op_type\" value \"%s\". The Ogg JSON message is '%s'",
+ op, new String(message)));
+ }
+ }
+ } catch (Throwable t) {
+ // a big try catch to protect the processing.
+ if (!ignoreParseErrors) {
+ throw new IOException(
+ format("Corrupt Ogg JSON message '%s'.", new String(message)), t);
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private void emitRow(
+ GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
+ // shortcut in case no output projection is required
+ if (!hasMetadata) {
+ out.collect(physicalRow);
+ return;
+ }
+
+ final int physicalArity = physicalRow.getArity();
+ final int metadataArity = metadataConverters.length;
+
+ final GenericRowData producedRow =
+ new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity);
+
+ for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
+ producedRow.setField(physicalPos, physicalRow.getField(physicalPos));
+ }
+
+ for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
+ producedRow.setField(
+ physicalArity + metadataPos, metadataConverters[metadataPos].convert(rootRow));
+ }
+
+ out.collect(producedRow);
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return producedTypeInfo;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ OggJsonDeserializationSchema that = (OggJsonDeserializationSchema) o;
+ return Objects.equals(jsonDeserializer, that.jsonDeserializer)
+ && hasMetadata == that.hasMetadata
+ && Objects.equals(producedTypeInfo, that.producedTypeInfo)
+ && ignoreParseErrors == that.ignoreParseErrors;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(jsonDeserializer, hasMetadata, producedTypeInfo, ignoreParseErrors);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Converter that extracts a metadata field from the row (root or payload) that comes out of the
+ * JSON schema and converts it to the desired data type.
+ */
+ interface MetadataConverter extends Serializable {
+
+ // Method for top-level access.
+ default Object convert(GenericRowData row) {
+ return convert(row, -1);
+ }
+
+ Object convert(GenericRowData row, int pos);
+ }
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java
new file mode 100644
index 0000000..f853983
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.ogg;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonFormatOptions;
+import org.apache.flink.formats.json.JsonFormatOptionsUtil;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
+import static org.apache.flink.formats.json.ogg.OggJsonFormatOptions.IGNORE_PARSE_ERRORS;
+import static org.apache.flink.formats.json.ogg.OggJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
+import static org.apache.flink.formats.json.ogg.OggJsonFormatOptions.JSON_MAP_NULL_KEY_MODE;
+import static org.apache.flink.formats.json.ogg.OggJsonFormatOptions.TIMESTAMP_FORMAT;
+
+/**
+ * Format factory for providing configured instances of Ogg JSON to RowData {@link
+ * DeserializationSchema}.
+ */
+@Internal
+public class OggJsonFormatFactory
+ implements DeserializationFormatFactory, SerializationFormatFactory {
+
+ public static final String IDENTIFIER = "ogg-json";
+
+ /** Validator for ogg decoding format. */
+ private static void validateDecodingFormatOptions(ReadableConfig tableOptions) {
+ JsonFormatOptionsUtil.validateDecodingFormatOptions(tableOptions);
+ }
+
+ /** Validator for ogg encoding format. */
+ private static void validateEncodingFormatOptions(ReadableConfig tableOptions) {
+ JsonFormatOptionsUtil.validateEncodingFormatOptions(tableOptions);
+ }
+
+ @Override
+ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
+ DynamicTableFactory.Context context, ReadableConfig formatOptions) {
+
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+ validateDecodingFormatOptions(formatOptions);
+
+ final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
+
+ final TimestampFormat timestampFormat =
+ JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
+
+ return new OggJsonDecodingFormat(ignoreParseErrors, timestampFormat);
+ }
+
+ @Override
+ public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
+ DynamicTableFactory.Context context, ReadableConfig formatOptions) {
+
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+ validateEncodingFormatOptions(formatOptions);
+
+ TimestampFormat timestampFormat = JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
+ JsonFormatOptions.MapNullKeyMode mapNullKeyMode =
+ JsonFormatOptionsUtil.getMapNullKeyMode(formatOptions);
+ String mapNullKeyLiteral = formatOptions.get(JSON_MAP_NULL_KEY_LITERAL);
+
+ final boolean encodeDecimalAsPlainNumber =
+ formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+
+ return new EncodingFormat<SerializationSchema<RowData>>() {
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_BEFORE)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+ }
+
+ @Override
+ public SerializationSchema<RowData> createRuntimeEncoder(
+ DynamicTableSink.Context context, DataType consumedDataType) {
+ final RowType rowType = (RowType) consumedDataType.getLogicalType();
+ return new OggJsonSerializationSchema(
+ rowType,
+ timestampFormat,
+ mapNullKeyMode,
+ mapNullKeyLiteral,
+ encodeDecimalAsPlainNumber);
+ }
+ };
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(IGNORE_PARSE_ERRORS);
+ options.add(TIMESTAMP_FORMAT);
+ options.add(JSON_MAP_NULL_KEY_MODE);
+ options.add(JSON_MAP_NULL_KEY_LITERAL);
+ options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ return options;
+ }
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatOptions.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatOptions.java
new file mode 100644
index 0000000..feeca79
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.ogg;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.formats.json.JsonFormatOptions;
+
+/** Option utils for ogg-json format. */
+@PublicEvolving
+public class OggJsonFormatOptions {
+
+ public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS =
+ JsonFormatOptions.IGNORE_PARSE_ERRORS;
+
+ public static final ConfigOption<String> TIMESTAMP_FORMAT = JsonFormatOptions.TIMESTAMP_FORMAT;
+
+ public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE =
+ JsonFormatOptions.MAP_NULL_KEY_MODE;
+
+ public static final ConfigOption<String> JSON_MAP_NULL_KEY_LITERAL =
+ JsonFormatOptions.MAP_NULL_KEY_LITERAL;
+
+ private OggJsonFormatOptions() {}
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java
new file mode 100644
index 0000000..2189f86
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.ogg;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonFormatOptions;
+import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Objects;
+
+import static java.lang.String.format;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
+
+/**
+ * Serialization schema from Flink Table/SQL internal data structure {@link RowData} to Ogg JSON.
+ *
+ * @see <a
+ * href="https://docs.oracle.com/goldengate/bd1221/gg-bd/GADBD/GUID-F0FA2781-0802-4530-B1F0-5E102B982EC0.htm#GADBD505">Ogg
+ * JSON Message</a>
+ */
+public class OggJsonSerializationSchema implements SerializationSchema<RowData> {
+ private static final long serialVersionUID = 1L;
+
+ private static final StringData OP_INSERT = StringData.fromString("I"); // insert
+ private static final StringData OP_DELETE = StringData.fromString("D"); // delete
+
+ /** The serializer to serialize Ogg JSON data. * */
+ private final JsonRowDataSerializationSchema jsonSerializer;
+
+ private transient GenericRowData genericRowData;
+
+ public OggJsonSerializationSchema(
+ RowType rowType,
+ TimestampFormat timestampFormat,
+ JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
+ String mapNullKeyLiteral,
+ boolean encodeDecimalAsPlainNumber) {
+ jsonSerializer =
+ new JsonRowDataSerializationSchema(
+ createJsonRowType(fromLogicalToDataType(rowType)),
+ timestampFormat,
+ mapNullKeyMode,
+ mapNullKeyLiteral,
+ encodeDecimalAsPlainNumber);
+ }
+
+ private static RowType createJsonRowType(DataType databaseSchema) {
+ // Ogg JSON contains some other information, e.g. "source", "ts_ms"
+ // but we don't need them.
+ return (RowType)
+ DataTypes.ROW(
+ DataTypes.FIELD("before", databaseSchema),
+ DataTypes.FIELD("after", databaseSchema),
+ DataTypes.FIELD("op_type", DataTypes.STRING()))
+ .getLogicalType();
+ }
+
+ @Override
+ public void open(InitializationContext context) {
+ genericRowData = new GenericRowData(3);
+ }
+
+ @Override
+ public byte[] serialize(RowData rowData) {
+ try {
+ switch (rowData.getRowKind()) {
+ case INSERT:
+ case UPDATE_AFTER:
+ genericRowData.setField(0, null);
+ genericRowData.setField(1, rowData);
+ genericRowData.setField(2, OP_INSERT);
+ return jsonSerializer.serialize(genericRowData);
+ case UPDATE_BEFORE:
+ case DELETE:
+ genericRowData.setField(0, rowData);
+ genericRowData.setField(1, null);
+ genericRowData.setField(2, OP_DELETE);
+ return jsonSerializer.serialize(genericRowData);
+ default:
+ throw new UnsupportedOperationException(
+ format(
+ "Unsupported operation '%s' for row kind.",
+ rowData.getRowKind()));
+ }
+ } catch (Throwable t) {
+ throw new RuntimeException(format("Could not serialize row '%s'.", rowData), t);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ OggJsonSerializationSchema that = (OggJsonSerializationSchema) o;
+ return Objects.equals(jsonSerializer, that.jsonSerializer);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(jsonSerializer);
+ }
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-formats-kafka/flink-json-debezium/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 08f4657..35e647d 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-formats-kafka/flink-json-debezium/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -17,3 +17,4 @@
org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
org.apache.flink.formats.json.canal.CanalJsonFormatFactory
org.apache.flink.formats.json.maxwell.MaxwellJsonFormatFactory
+org.apache.flink.formats.json.ogg.OggJsonFormatFactory
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFileSystemITCase.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFileSystemITCase.java
new file mode 100644
index 0000000..eaac0d3
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFileSystemITCase.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.ogg;
+
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+/** Test Filesystem connector with DebeziumJson. */
+public class OggJsonFileSystemITCase extends StreamingTestBase {
+
+ private static final List<String> EXPECTED =
+ Arrays.asList(
+ "+I[101, SCOOTER, Small 2-wheel scooter, 3.14]",
+ "+I[102, CAR BATTERY, 12V car battery, 8.1]",
+ "+I[103, 12-PACK DRILL BITS, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]",
+ "+I[104, HAMMER, 12oz carpenter's hammer, 0.75]",
+ "+I[105, HAMMER, 14oz carpenter's hammer, 0.875]",
+ "+I[106, HAMMER, 16oz carpenter's hammer, 1.0]",
+ "+I[107, ROCKS, box of assorted rocks, 5.3]",
+ "+I[108, JACKET, water resistent black wind breaker, 0.1]",
+ "+I[109, SPARE TIRE, 24 inch spare tire, 22.2]",
+ "-D[106, HAMMER, 16oz carpenter's hammer, 1.0]", // -U
+ "+I[106, HAMMER, 18oz carpenter hammer, 1.0]", // +U
+ "-D[107, ROCKS, box of assorted rocks, 5.3]", // -U
+ "+I[107, ROCKS, box of assorted rocks, 5.1]", // +U
+ "+I[110, JACKET, water resistent white wind breaker, 0.2]",
+ "+I[111, SCOOTER, Big 2-wheel scooter , 5.18]",
+ "-D[110, JACKET, water resistent white wind breaker, 0.2]", // -U
+ "+I[110, JACKET, new water resistent white wind breaker, 0.5]", // +U
+ "-D[111, SCOOTER, Big 2-wheel scooter , 5.18]", // -U
+ "+I[111, SCOOTER, Big 2-wheel scooter , 5.17]", // +U
+ "-D[111, SCOOTER, Big 2-wheel scooter , 5.17]");
+
+ private File source;
+ private File sink;
+
+ private static byte[] readBytes(String resource) throws IOException {
+ final URL url = OggJsonSerDeSchemaTest.class.getClassLoader().getResource(resource);
+ assert url != null;
+ Path path = new File(url.getFile()).toPath();
+ return Files.readAllBytes(path);
+ }
+
+ private void prepareTables(boolean isPartition) throws IOException {
+ byte[] bytes = readBytes("ogg-data.txt");
+ source = TEMPORARY_FOLDER.newFolder();
+ File file;
+ if (isPartition) {
+ File partition = new File(source, "p=1");
+ partition.mkdirs();
+ file = new File(partition, "my_file");
+ } else {
+ file = new File(source, "my_file");
+ }
+ file.createNewFile();
+ Files.write(file.toPath(), bytes);
+
+ sink = TEMPORARY_FOLDER.newFolder();
+
+ env().setParallelism(1);
+ }
+
+ private void createTable(boolean isSink, String path, boolean isPartition) {
+ tEnv().executeSql(
+ format("create table %s (", isSink ? "sink" : "source")
+ + "id int, name string,"
+ + (isSink ? "upper_name string," : "")
+ + " description string, weight float"
+ + (isPartition ? ", p int) partitioned by (p) " : ")")
+ + " with ("
+ + "'connector'='filesystem',"
+ + "'format'='debezium-json',"
+ + format("'path'='%s'", path)
+ + ")");
+ }
+
+ @Test
+ public void testNonPartition() throws Exception {
+ prepareTables(false);
+ createTable(false, source.toURI().toString(), false);
+ createTable(true, sink.toURI().toString(), false);
+
+ tEnv().executeSql(
+ "insert into sink select id,name,UPPER(name),description,weight from source")
+ .await();
+ CloseableIterator<Row> iter =
+ tEnv().executeSql("select id,upper_name,description,weight from sink").collect();
+
+ List<String> results =
+ CollectionUtil.iteratorToList(iter).stream()
+ .map(Row::toString)
+ .collect(Collectors.toList());
+ iter.close();
+
+ Assert.assertEquals(EXPECTED, results);
+ }
+
+ @Test
+ public void testPartition() throws Exception {
+ prepareTables(true);
+ createTable(false, source.toURI().toString(), true);
+ createTable(true, sink.toURI().toString(), true);
+
+ tEnv().executeSql(
+ "insert into sink select id,name,UPPER(name),description,weight,p from source")
+ .await();
+ CloseableIterator<Row> iter =
+ tEnv().executeSql("select id,upper_name,description,weight,p from sink").collect();
+ List<Row> list = CollectionUtil.iteratorToList(iter);
+ iter.close();
+
+ List<String> results =
+ list.stream()
+ .map(row -> Row.project(row, new int[] {0, 1, 2, 3}))
+ .map(Row::toString)
+ .collect(Collectors.toList());
+
+ Assert.assertEquals(EXPECTED, results);
+
+ // check partition value
+ for (Row row : list) {
+ Assert.assertEquals(1, row.getField(4));
+ }
+ }
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java
new file mode 100644
index 0000000..fd669a1
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.ogg;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonFormatOptions;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TestDynamicTableFactory;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_DATA_TYPE;
+import static org.apache.flink.table.factories.utils.FactoryMocks.SCHEMA;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.junit.Assert.assertEquals;
+
+/** Tests for {@link OggJsonFormatFactory}. */
+public class OggJsonFormatFactoryTest extends TestLogger {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testSeDeSchema() {
+ final Map<String, String> options = getAllOptions();
+
+ final OggJsonSerializationSchema expectedSer =
+ new OggJsonSerializationSchema(
+ (RowType) PHYSICAL_DATA_TYPE.getLogicalType(),
+ TimestampFormat.ISO_8601,
+ JsonFormatOptions.MapNullKeyMode.LITERAL,
+ "null",
+ true);
+
+ final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
+ assert actualSink instanceof TestDynamicTableFactory.DynamicTableSinkMock;
+ TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+ (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
+
+ SerializationSchema<RowData> actualSer =
+ sinkMock.valueFormat.createRuntimeEncoder(
+ new SinkRuntimeProviderContext(false), PHYSICAL_DATA_TYPE);
+
+ assertEquals(expectedSer, actualSer);
+ }
+
+ @Test
+ public void testInvalidIgnoreParseError() {
+ thrown.expect(
+ containsCause(
+ new IllegalArgumentException(
+ "Unrecognized option for boolean: abc. Expected either true or false(case insensitive)")));
+
+ final Map<String, String> options =
+ getModifiedOptions(opts -> opts.put("ogg-json.ignore-parse-errors", "abc"));
+
+ createTableSource(SCHEMA, options);
+ }
+
+ @Test
+ public void testInvalidOptionForTimestampFormat() {
+ final Map<String, String> tableOptions =
+ getModifiedOptions(opts -> opts.put("ogg-json.timestamp-format.standard", "test"));
+
+ thrown.expect(ValidationException.class);
+ thrown.expect(
+ containsCause(
+ new ValidationException(
+ "Unsupported value 'test' for timestamp-format.standard. Supported values are [SQL, ISO-8601].")));
+ createTableSource(SCHEMA, tableOptions);
+ }
+
+ @Test
+ public void testInvalidOptionForMapNullKeyMode() {
+ final Map<String, String> tableOptions =
+ getModifiedOptions(opts -> opts.put("ogg-json.map-null-key.mode", "invalid"));
+
+ thrown.expect(ValidationException.class);
+ thrown.expect(
+ containsCause(
+ new ValidationException(
+ "Unsupported value 'invalid' for option map-null-key.mode. Supported values are [LITERAL, FAIL, DROP].")));
+ createTableSink(SCHEMA, tableOptions);
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ /**
+ * Returns the full options modified by the given consumer {@code optionModifier}.
+ *
+ * @param optionModifier Consumer to modify the options
+ */
+ private Map<String, String> getModifiedOptions(Consumer<Map<String, String>> optionModifier) {
+ Map<String, String> options = getAllOptions();
+ optionModifier.accept(options);
+ return options;
+ }
+
+ private Map<String, String> getAllOptions() {
+ final Map<String, String> options = new HashMap<>();
+ options.put("connector", TestDynamicTableFactory.IDENTIFIER);
+ options.put("target", "MyTarget");
+ options.put("buffer-size", "1000");
+
+ options.put("format", "ogg-json");
+ options.put("ogg-json.ignore-parse-errors", "true");
+ options.put("ogg-json.timestamp-format.standard", "ISO-8601");
+ options.put("ogg-json.map-null-key.mode", "LITERAL");
+ options.put("ogg-json.map-null-key.literal", "null");
+ options.put("ogg-json.encode.decimal-as-plain-number", "true");
+ return options;
+ }
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
new file mode 100644
index 0000000..dc99f58
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.ogg;
+
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonFormatOptions;
+import org.apache.flink.formats.json.ogg.OggJsonDecodingFormat.ReadableMetadata;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.util.Collector;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link OggJsonSerializationSchema} and {@link OggJsonDeserializationSchema}. */
+public class OggJsonSerDeSchemaTest {
+
+ private static final DataType PHYSICAL_DATA_TYPE =
+ ROW(
+ FIELD("id", INT().notNull()),
+ FIELD("name", STRING()),
+ FIELD("description", STRING()),
+ FIELD("weight", FLOAT()));
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ private static List<String> readLines(String resource) throws IOException {
+ final URL url = OggJsonSerDeSchemaTest.class.getClassLoader().getResource(resource);
+ assert url != null;
+ Path path = new File(url.getFile()).toPath();
+ return Files.readAllLines(path);
+ }
+
+ @Test
+ public void testSerializationAndDeserialization() throws Exception {
+ testSerializationDeserialization("ogg-data.txt");
+ }
+
+ @Test
+ public void testTombstoneMessages() throws Exception {
+ OggJsonDeserializationSchema deserializationSchema =
+ new OggJsonDeserializationSchema(
+ PHYSICAL_DATA_TYPE,
+ Collections.emptyList(),
+ InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()),
+ false,
+ TimestampFormat.ISO_8601);
+ SimpleCollector collector = new SimpleCollector();
+ deserializationSchema.deserialize(null, collector);
+ deserializationSchema.deserialize(new byte[] {}, collector);
+ assertTrue(collector.list.isEmpty());
+ }
+
+ private void testSerializationDeserialization(String resourceFile) throws Exception {
+ List<String> lines = readLines(resourceFile);
+ OggJsonDeserializationSchema deserializationSchema =
+ new OggJsonDeserializationSchema(
+ PHYSICAL_DATA_TYPE,
+ Collections.emptyList(),
+ InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()),
+ false,
+ TimestampFormat.ISO_8601);
+
+ SimpleCollector collector = new SimpleCollector();
+ for (String line : lines) {
+ deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8), collector);
+ }
+
+ // Ogg captures change data (`ogg-data.txt`) on the `product`
+ // table:
+ //
+ // CREATE TABLE product (
+ // id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ // name VARCHAR(255),
+ // description VARCHAR(512),
+ // weight FLOAT
+ // );
+ // ALTER TABLE product AUTO_INCREMENT = 101;
+ //
+ // INSERT INTO product
+ // VALUES (default,"scooter","Small 2-wheel scooter",3.14),
+ // (default,"car battery","12V car battery",8.1),
+ // (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40
+ // to #3",0.8),
+ // (default,"hammer","12oz carpenter's hammer",0.75),
+ // (default,"hammer","14oz carpenter's hammer",0.875),
+ // (default,"hammer","16oz carpenter's hammer",1.0),
+ // (default,"rocks","box of assorted rocks",5.3),
+ // (default,"jacket","water resistent black wind breaker",0.1),
+ // (default,"spare tire","24 inch spare tire",22.2);
+ // UPDATE product SET description='18oz carpenter hammer' WHERE id=106;
+ // UPDATE product SET weight='5.1' WHERE id=107;
+ // INSERT INTO product VALUES (default,"jacket","water resistent white wind breaker",0.2);
+ // INSERT INTO product VALUES (default,"scooter","Big 2-wheel scooter ",5.18);
+ // UPDATE product SET description='new water resistent white wind breaker', weight='0.5'
+ // WHERE id=110;
+ // UPDATE product SET weight='5.17' WHERE id=111;
+ // DELETE FROM product WHERE id=111;
+ List<String> expected =
+ Arrays.asList(
+ "+I(101,scooter,Small 2-wheel scooter,3.14)",
+ "+I(102,car battery,12V car battery,8.1)",
+ "+I(103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8)",
+ "+I(104,hammer,12oz carpenter's hammer,0.75)",
+ "+I(105,hammer,14oz carpenter's hammer,0.875)",
+ "+I(106,hammer,16oz carpenter's hammer,1.0)",
+ "+I(107,rocks,box of assorted rocks,5.3)",
+ "+I(108,jacket,water resistent black wind breaker,0.1)",
+ "+I(109,spare tire,24 inch spare tire,22.2)",
+ "-U(106,hammer,16oz carpenter's hammer,1.0)",
+ "+U(106,hammer,18oz carpenter hammer,1.0)",
+ "-U(107,rocks,box of assorted rocks,5.3)",
+ "+U(107,rocks,box of assorted rocks,5.1)",
+ "+I(110,jacket,water resistent white wind breaker,0.2)",
+ "+I(111,scooter,Big 2-wheel scooter ,5.18)",
+ "-U(110,jacket,water resistent white wind breaker,0.2)",
+ "+U(110,jacket,new water resistent white wind breaker,0.5)",
+ "-U(111,scooter,Big 2-wheel scooter ,5.18)",
+ "+U(111,scooter,Big 2-wheel scooter ,5.17)",
+ "-D(111,scooter,Big 2-wheel scooter ,5.17)");
+ List<String> actual =
+ collector.list.stream().map(Object::toString).collect(Collectors.toList());
+ assertEquals(expected, actual);
+
+ OggJsonSerializationSchema serializationSchema =
+ new OggJsonSerializationSchema(
+ (RowType) PHYSICAL_DATA_TYPE.getLogicalType(),
+ TimestampFormat.SQL,
+ JsonFormatOptions.MapNullKeyMode.LITERAL,
+ "null",
+ true);
+
+ serializationSchema.open(null);
+ actual = new ArrayList<>();
+ for (RowData rowData : collector.list) {
+ actual.add(new String(serializationSchema.serialize(rowData), StandardCharsets.UTF_8));
+ }
+
+ expected =
+ Arrays.asList(
+ "{\"before\":null,\"after\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":3.14},\"op_type\":\"I\"}",
+ "{\"before\":null,\"after\":{\"id\":102,\"name\":\"car battery\",\"description\":\"12V car battery\",\"weight\":8.1},\"op_type\":\"I\"}",
+ "{\"before\":null,\"after\":{\"id\":103,\"name\":\"12-pack drill bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\"weight\":0.8},\"op_type\":\"I\"}",
+ "{\"before\":null,\"after\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's hammer\",\"weight\":0.75},\"op_type\":\"I\"}",
+ "{\"before\":null,\"after\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's hammer\",\"weight\":0.875},\"op_type\":\"I\"}",
+ "{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0},\"op_type\":\"I\"}",
+ "{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3},\"op_type\":\"I\"}",
+ "{\"before\":null,\"after\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent black wind breaker\",\"weight\":0.1},\"op_type\":\"I\"}",
+ "{\"before\":null,\"after\":{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":22.2},\"op_type\":\"I\"}",
+ "{\"before\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0},\"after\":null,\"op_type\":\"D\"}",
+ "{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter hammer\",\"weight\":1.0},\"op_type\":\"I\"}",
+ "{\"before\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3},\"after\":null,\"op_type\":\"D\"}",
+ "{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.1},\"op_type\":\"I\"}",
+ "{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent white wind breaker\",\"weight\":0.2},\"op_type\":\"I\"}",
+ "{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.18},\"op_type\":\"I\"}",
+ "{\"before\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent white wind breaker\",\"weight\":0.2},\"after\":null,\"op_type\":\"D\"}",
+ "{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water resistent white wind breaker\",\"weight\":0.5},\"op_type\":\"I\"}",
+ "{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.18},\"after\":null,\"op_type\":\"D\"}",
+ "{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"op_type\":\"I\"}",
+ "{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"after\":null,\"op_type\":\"D\"}");
+ assertEquals(expected, actual);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Utilities
+ // --------------------------------------------------------------------------------------------
+
+ private void testDeserializationWithMetadata(
+ String resourceFile, Consumer<RowData> testConsumer) throws Exception {
+ // we only read the first line for keeping the test simple
+ final String firstLine = readLines(resourceFile).get(0);
+
+ final List<ReadableMetadata> requestedMetadata = Arrays.asList(ReadableMetadata.values());
+
+ final DataType producedDataType =
+ DataTypeUtils.appendRowFields(
+ PHYSICAL_DATA_TYPE,
+ requestedMetadata.stream()
+ .map(m -> DataTypes.FIELD(m.key, m.dataType))
+ .collect(Collectors.toList()));
+
+ final OggJsonDeserializationSchema deserializationSchema =
+ new OggJsonDeserializationSchema(
+ PHYSICAL_DATA_TYPE,
+ requestedMetadata,
+ InternalTypeInfo.of(producedDataType.getLogicalType()),
+ false,
+ TimestampFormat.ISO_8601);
+
+ final SimpleCollector collector = new SimpleCollector();
+ deserializationSchema.deserialize(firstLine.getBytes(StandardCharsets.UTF_8), collector);
+
+ assertEquals(1, collector.list.size());
+ testConsumer.accept(collector.list.get(0));
+ }
+
+ private static class SimpleCollector implements Collector<RowData> {
+
+ private final List<RowData> list = new ArrayList<>();
+
+ @Override
+ public void collect(RowData record) {
+ list.add(record);
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+ }
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/ogg-data.txt b/flink-formats-kafka/flink-json-debezium/src/test/resources/ogg-data.txt
new file mode 100644
index 0000000..04b4953
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/test/resources/ogg-data.txt
@@ -0,0 +1,16 @@
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000143","priamry_keys":["id"],"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"op_type":"I","op_ts":"2020-05-13 15:40:06.000000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000144","priamry_keys":["id"],"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000145","priamry_keys":["id"],"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.800000011920929},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000146","priamry_keys":["id"],"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000147","priamry_keys":["id"],"after":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000148","priamry_keys":["id"],"after":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000149","priamry_keys":["id"],"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000150","priamry_keys":["id"],"after":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.10000000149011612},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000151","priamry_keys":["id"],"after":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000152","priamry_keys":["id"],"before":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"after":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1},"op_type":"U","op_ts":"2020-05-13 17:26:27.936000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000153","priamry_keys":["id"],"before":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.099999904632568},"op_type":"U","op_ts":"2020-05-13 17:28:19.505000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000154","priamry_keys":["id"],"after":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","op_ts":1589362210000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1068,"row":0,"thread":2,"query":null},"op_type":"I","op_ts":"2020-05-13 17:30:10.230000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000155","priamry_keys":["id"],"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"op_type":"I","op_ts":"2020-05-13 17:30:43.428000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000156","priamry_keys":["id"],"before":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"after":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"op_type":"U","op_ts":"2020-05-13 17:32:20.327000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000157","priamry_keys":["id"],"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"op_type":"U","op_ts":"2020-05-13 17:32:10.904000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000158","priamry_keys":["id"],"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"after":null,"op_type":"D","op_ts":"2020-05-13 17:32:24.455000"}