[FLINK-31747] remove debezium json for now

This closes #16.
diff --git a/flink-formats-kafka/flink-json-debezium/archunit-violations/24119078-1071-4906-b2ac-ed57c8154eaa b/flink-formats-kafka/flink-json-debezium/archunit-violations/24119078-1071-4906-b2ac-ed57c8154eaa
deleted file mode 100644
index e69de29..0000000
--- a/flink-formats-kafka/flink-json-debezium/archunit-violations/24119078-1071-4906-b2ac-ed57c8154eaa
+++ /dev/null
diff --git a/flink-formats-kafka/flink-json-debezium/archunit-violations/62c5e4e5-2b0e-41ed-a268-ee33d5edd162 b/flink-formats-kafka/flink-json-debezium/archunit-violations/62c5e4e5-2b0e-41ed-a268-ee33d5edd162
deleted file mode 100644
index e69de29..0000000
--- a/flink-formats-kafka/flink-json-debezium/archunit-violations/62c5e4e5-2b0e-41ed-a268-ee33d5edd162
+++ /dev/null
diff --git a/flink-formats-kafka/flink-json-debezium/archunit-violations/stored.rules b/flink-formats-kafka/flink-json-debezium/archunit-violations/stored.rules
deleted file mode 100644
index d43a144..0000000
--- a/flink-formats-kafka/flink-json-debezium/archunit-violations/stored.rules
+++ /dev/null
@@ -1,4 +0,0 @@
-#
-#Mon Apr 04 17:11:32 CEST 2022
-Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ ITCase=24119078-1071-4906-b2ac-ed57c8154eaa
-ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\ extension=62c5e4e5-2b0e-41ed-a268-ee33d5edd162
diff --git a/flink-formats-kafka/flink-json-debezium/pom.xml b/flink-formats-kafka/flink-json-debezium/pom.xml
deleted file mode 100644
index 147cf0e..0000000
--- a/flink-formats-kafka/flink-json-debezium/pom.xml
+++ /dev/null
@@ -1,141 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-formats-kafka</artifactId>
-		<version>4.1-SNAPSHOT</version>
-	</parent>
-
-	<artifactId>flink-json-debezium</artifactId>
-	<name>Flink : Formats : Json Debezium</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-json</artifactId>
-			<version>${flink.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-common</artifactId>
-			<version>${flink.version}</version>
-			<scope>provided</scope>
-			<optional>true</optional>
-		</dependency>
-
-		<!-- test dependencies -->
-
-		<!-- Required for integration test -->
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-files</artifactId>
-			<version>${flink.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<!-- JSON table descriptor testing -->
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-common</artifactId>
-			<version>${flink.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-
-		<!-- JSON RowData schema test dependency -->
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
-			<version>${flink.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<!-- Json filesystem format factory ITCase test dependency -->
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
-			<version>${flink.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
-			<version>${flink.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-
-		<!-- test utils dependency -->
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-test-utils</artifactId>
-			<version>${flink.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils</artifactId>
-			<version>${flink.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<!-- ArchUit test dependencies -->
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-architecture-tests-test</artifactId>
-			<scope>test</scope>
-		</dependency>
-	</dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
-				<executions>
-					<execution>
-						<id>shade-flink</id>
-						<phase>package</phase>
-						<goals>
-							<goal>shade</goal>
-						</goals>
-						<configuration>
-							<artifactSet>
-								<includes>
-									<include>org.apache.flink:flink-format-common</include>
-								</includes>
-							</artifactSet>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-
-</project>
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java
deleted file mode 100644
index 706d01a..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.json.debezium;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.MetadataConverter;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.Projection;
-import org.apache.flink.table.connector.format.DecodingFormat;
-import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.data.GenericMapData;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.utils.DataTypeUtils;
-import org.apache.flink.types.RowKind;
-
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/** {@link DecodingFormat} for Debezium using JSON encoding. */
-public class DebeziumJsonDecodingFormat
-        implements ProjectableDecodingFormat<DeserializationSchema<RowData>> {
-
-    // --------------------------------------------------------------------------------------------
-    // Mutable attributes
-    // --------------------------------------------------------------------------------------------
-
-    private List<String> metadataKeys;
-
-    // --------------------------------------------------------------------------------------------
-    // Debezium-specific attributes
-    // --------------------------------------------------------------------------------------------
-
-    private final boolean schemaInclude;
-
-    private final boolean ignoreParseErrors;
-
-    private final TimestampFormat timestampFormat;
-
-    public DebeziumJsonDecodingFormat(
-            boolean schemaInclude, boolean ignoreParseErrors, TimestampFormat timestampFormat) {
-        this.schemaInclude = schemaInclude;
-        this.ignoreParseErrors = ignoreParseErrors;
-        this.timestampFormat = timestampFormat;
-        this.metadataKeys = Collections.emptyList();
-    }
-
-    @Override
-    public DeserializationSchema<RowData> createRuntimeDecoder(
-            DynamicTableSource.Context context, DataType physicalDataType, int[][] projections) {
-        physicalDataType = Projection.of(projections).project(physicalDataType);
-
-        final List<ReadableMetadata> readableMetadata =
-                metadataKeys.stream()
-                        .map(
-                                k ->
-                                        Stream.of(ReadableMetadata.values())
-                                                .filter(rm -> rm.key.equals(k))
-                                                .findFirst()
-                                                .orElseThrow(IllegalStateException::new))
-                        .collect(Collectors.toList());
-
-        final List<DataTypes.Field> metadataFields =
-                readableMetadata.stream()
-                        .map(m -> DataTypes.FIELD(m.key, m.dataType))
-                        .collect(Collectors.toList());
-
-        final DataType producedDataType =
-                DataTypeUtils.appendRowFields(physicalDataType, metadataFields);
-
-        final TypeInformation<RowData> producedTypeInfo =
-                context.createTypeInformation(producedDataType);
-
-        return new DebeziumJsonDeserializationSchema(
-                physicalDataType,
-                readableMetadata,
-                producedTypeInfo,
-                schemaInclude,
-                ignoreParseErrors,
-                timestampFormat);
-    }
-
-    @Override
-    public Map<String, DataType> listReadableMetadata() {
-        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
-        Stream.of(ReadableMetadata.values())
-                .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
-        return metadataMap;
-    }
-
-    @Override
-    public void applyReadableMetadata(List<String> metadataKeys) {
-        this.metadataKeys = metadataKeys;
-    }
-
-    @Override
-    public ChangelogMode getChangelogMode() {
-        return ChangelogMode.newBuilder()
-                .addContainedKind(RowKind.INSERT)
-                .addContainedKind(RowKind.UPDATE_BEFORE)
-                .addContainedKind(RowKind.UPDATE_AFTER)
-                .addContainedKind(RowKind.DELETE)
-                .build();
-    }
-
-    // --------------------------------------------------------------------------------------------
-    // Metadata handling
-    // --------------------------------------------------------------------------------------------
-
-    /** List of metadata that can be read with this format. */
-    enum ReadableMetadata {
-        SCHEMA(
-                "schema",
-                DataTypes.STRING().nullable(),
-                false,
-                DataTypes.FIELD("schema", DataTypes.STRING()),
-                new MetadataConverter() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public Object convert(GenericRowData row, int pos) {
-                        return row.getString(pos);
-                    }
-                }),
-
-        INGESTION_TIMESTAMP(
-                "ingestion-timestamp",
-                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
-                true,
-                DataTypes.FIELD("ts_ms", DataTypes.BIGINT()),
-                new MetadataConverter() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public Object convert(GenericRowData row, int pos) {
-                        if (row.isNullAt(pos)) {
-                            return null;
-                        }
-                        return TimestampData.fromEpochMillis(row.getLong(pos));
-                    }
-                }),
-
-        SOURCE_TIMESTAMP(
-                "source.timestamp",
-                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
-                true,
-                DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
-                new MetadataConverter() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public Object convert(GenericRowData row, int pos) {
-                        final StringData timestamp =
-                                (StringData) readProperty(row, pos, KEY_SOURCE_TIMESTAMP);
-                        if (timestamp == null) {
-                            return null;
-                        }
-                        return TimestampData.fromEpochMillis(Long.parseLong(timestamp.toString()));
-                    }
-                }),
-
-        SOURCE_DATABASE(
-                "source.database",
-                DataTypes.STRING().nullable(),
-                true,
-                DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
-                new MetadataConverter() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public Object convert(GenericRowData row, int pos) {
-                        return readProperty(row, pos, KEY_SOURCE_DATABASE);
-                    }
-                }),
-
-        SOURCE_SCHEMA(
-                "source.schema",
-                DataTypes.STRING().nullable(),
-                true,
-                DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
-                new MetadataConverter() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public Object convert(GenericRowData row, int pos) {
-                        return readProperty(row, pos, KEY_SOURCE_SCHEMA);
-                    }
-                }),
-
-        SOURCE_TABLE(
-                "source.table",
-                DataTypes.STRING().nullable(),
-                true,
-                DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
-                new MetadataConverter() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public Object convert(GenericRowData row, int pos) {
-                        return readProperty(row, pos, KEY_SOURCE_TABLE);
-                    }
-                }),
-
-        SOURCE_PROPERTIES(
-                "source.properties",
-                // key and value of the map are nullable to make handling easier in queries
-                DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable())
-                        .nullable(),
-                true,
-                DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
-                new MetadataConverter() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public Object convert(GenericRowData row, int pos) {
-                        return row.getMap(pos);
-                    }
-                });
-
-        final String key;
-
-        final DataType dataType;
-
-        final boolean isJsonPayload;
-
-        final DataTypes.Field requiredJsonField;
-
-        final MetadataConverter converter;
-
-        ReadableMetadata(
-                String key,
-                DataType dataType,
-                boolean isJsonPayload,
-                DataTypes.Field requiredJsonField,
-                MetadataConverter converter) {
-            this.key = key;
-            this.dataType = dataType;
-            this.isJsonPayload = isJsonPayload;
-            this.requiredJsonField = requiredJsonField;
-            this.converter = converter;
-        }
-    }
-
-    private static final StringData KEY_SOURCE_TIMESTAMP = StringData.fromString("ts_ms");
-
-    private static final StringData KEY_SOURCE_DATABASE = StringData.fromString("db");
-
-    private static final StringData KEY_SOURCE_SCHEMA = StringData.fromString("schema");
-
-    private static final StringData KEY_SOURCE_TABLE = StringData.fromString("table");
-
-    private static Object readProperty(GenericRowData row, int pos, StringData key) {
-        final GenericMapData map = (GenericMapData) row.getMap(pos);
-        if (map == null) {
-            return null;
-        }
-        return map.get(key);
-    }
-}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
deleted file mode 100644
index cba336d..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.json.debezium;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
-import org.apache.flink.formats.json.debezium.DebeziumJsonDecodingFormat.ReadableMetadata;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.DataTypeUtils;
-import org.apache.flink.types.RowKind;
-import org.apache.flink.util.Collector;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-import static java.lang.String.format;
-
-/**
- * Deserialization schema from Debezium JSON to Flink Table/SQL internal data structure {@link
- * RowData}. The deserialization schema knows Debezium's schema definition and can extract the
- * database data and convert into {@link RowData} with {@link RowKind}.
- *
- * <p>Deserializes a <code>byte[]</code> message as a JSON object and reads the specified fields.
- *
- * <p>Failures during deserialization are forwarded as wrapped IOExceptions.
- *
- * @see <a href="https://debezium.io/">Debezium</a>
- */
-@Internal
-public final class DebeziumJsonDeserializationSchema implements DeserializationSchema<RowData> {
-    private static final long serialVersionUID = 1L;
-
-    private static final String OP_READ = "r"; // snapshot read
-    private static final String OP_CREATE = "c"; // insert
-    private static final String OP_UPDATE = "u"; // update
-    private static final String OP_DELETE = "d"; // delete
-
-    private static final String REPLICA_IDENTITY_EXCEPTION =
-            "The \"before\" field of %s message is null, "
-                    + "if you are using Debezium Postgres Connector, "
-                    + "please check the Postgres table has been set REPLICA IDENTITY to FULL level.";
-
-    /** The deserializer to deserialize Debezium JSON data. */
-    private final JsonRowDataDeserializationSchema jsonDeserializer;
-
-    /** Flag that indicates that an additional projection is required for metadata. */
-    private final boolean hasMetadata;
-
-    /** Metadata to be extracted for every record. */
-    private final MetadataConverter[] metadataConverters;
-
-    /** {@link TypeInformation} of the produced {@link RowData} (physical + meta data). */
-    private final TypeInformation<RowData> producedTypeInfo;
-
-    /**
-     * Flag indicating whether the Debezium JSON data contains schema part or not. When Debezium
-     * Kafka Connect enables "value.converter.schemas.enable", the JSON will contain "schema"
-     * information, but we just ignore "schema" and extract data from "payload".
-     */
-    private final boolean schemaInclude;
-
-    /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
-    private final boolean ignoreParseErrors;
-
-    public DebeziumJsonDeserializationSchema(
-            DataType physicalDataType,
-            List<ReadableMetadata> requestedMetadata,
-            TypeInformation<RowData> producedTypeInfo,
-            boolean schemaInclude,
-            boolean ignoreParseErrors,
-            TimestampFormat timestampFormat) {
-        final RowType jsonRowType =
-                createJsonRowType(physicalDataType, requestedMetadata, schemaInclude);
-        this.jsonDeserializer =
-                new JsonRowDataDeserializationSchema(
-                        jsonRowType,
-                        // the result type is never used, so it's fine to pass in the produced type
-                        // info
-                        producedTypeInfo,
-                        false, // ignoreParseErrors already contains the functionality of
-                        // failOnMissingField
-                        ignoreParseErrors,
-                        timestampFormat);
-        this.hasMetadata = requestedMetadata.size() > 0;
-        this.metadataConverters =
-                createMetadataConverters(jsonRowType, requestedMetadata, schemaInclude);
-        this.producedTypeInfo = producedTypeInfo;
-        this.schemaInclude = schemaInclude;
-        this.ignoreParseErrors = ignoreParseErrors;
-    }
-
-    @Override
-    public void open(InitializationContext context) throws Exception {
-        jsonDeserializer.open(context);
-    }
-
-    @Override
-    public RowData deserialize(byte[] message) {
-        throw new RuntimeException(
-                "Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
-    }
-
-    @Override
-    public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
-        if (message == null || message.length == 0) {
-            // skip tombstone messages
-            return;
-        }
-        try {
-            GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message);
-            GenericRowData payload;
-            if (schemaInclude) {
-                payload = (GenericRowData) row.getField(0);
-            } else {
-                payload = row;
-            }
-
-            GenericRowData before = (GenericRowData) payload.getField(0);
-            GenericRowData after = (GenericRowData) payload.getField(1);
-            String op = payload.getField(2).toString();
-            if (OP_CREATE.equals(op) || OP_READ.equals(op)) {
-                after.setRowKind(RowKind.INSERT);
-                emitRow(row, after, out);
-            } else if (OP_UPDATE.equals(op)) {
-                if (before == null) {
-                    throw new IllegalStateException(
-                            String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE"));
-                }
-                before.setRowKind(RowKind.UPDATE_BEFORE);
-                after.setRowKind(RowKind.UPDATE_AFTER);
-                emitRow(row, before, out);
-                emitRow(row, after, out);
-            } else if (OP_DELETE.equals(op)) {
-                if (before == null) {
-                    throw new IllegalStateException(
-                            String.format(REPLICA_IDENTITY_EXCEPTION, "DELETE"));
-                }
-                before.setRowKind(RowKind.DELETE);
-                emitRow(row, before, out);
-            } else {
-                if (!ignoreParseErrors) {
-                    throw new IOException(
-                            format(
-                                    "Unknown \"op\" value \"%s\". The Debezium JSON message is '%s'",
-                                    op, new String(message)));
-                }
-            }
-        } catch (Throwable t) {
-            // a big try catch to protect the processing.
-            if (!ignoreParseErrors) {
-                throw new IOException(
-                        format("Corrupt Debezium JSON message '%s'.", new String(message)), t);
-            }
-        }
-    }
-
-    private void emitRow(
-            GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
-        // shortcut in case no output projection is required
-        if (!hasMetadata) {
-            out.collect(physicalRow);
-            return;
-        }
-
-        final int physicalArity = physicalRow.getArity();
-        final int metadataArity = metadataConverters.length;
-
-        final GenericRowData producedRow =
-                new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity);
-
-        for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
-            producedRow.setField(physicalPos, physicalRow.getField(physicalPos));
-        }
-
-        for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
-            producedRow.setField(
-                    physicalArity + metadataPos, metadataConverters[metadataPos].convert(rootRow));
-        }
-
-        out.collect(producedRow);
-    }
-
-    @Override
-    public boolean isEndOfStream(RowData nextElement) {
-        return false;
-    }
-
-    @Override
-    public TypeInformation<RowData> getProducedType() {
-        return producedTypeInfo;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        DebeziumJsonDeserializationSchema that = (DebeziumJsonDeserializationSchema) o;
-        return Objects.equals(jsonDeserializer, that.jsonDeserializer)
-                && hasMetadata == that.hasMetadata
-                && Objects.equals(producedTypeInfo, that.producedTypeInfo)
-                && schemaInclude == that.schemaInclude
-                && ignoreParseErrors == that.ignoreParseErrors;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(
-                jsonDeserializer, hasMetadata, producedTypeInfo, schemaInclude, ignoreParseErrors);
-    }
-
-    // --------------------------------------------------------------------------------------------
-
-    private static RowType createJsonRowType(
-            DataType physicalDataType,
-            List<ReadableMetadata> readableMetadata,
-            boolean schemaInclude) {
-        DataType payload =
-                DataTypes.ROW(
-                        DataTypes.FIELD("before", physicalDataType),
-                        DataTypes.FIELD("after", physicalDataType),
-                        DataTypes.FIELD("op", DataTypes.STRING()));
-
-        // append fields that are required for reading metadata in the payload
-        final List<DataTypes.Field> payloadMetadataFields =
-                readableMetadata.stream()
-                        .filter(m -> m.isJsonPayload)
-                        .map(m -> m.requiredJsonField)
-                        .distinct()
-                        .collect(Collectors.toList());
-        payload = DataTypeUtils.appendRowFields(payload, payloadMetadataFields);
-
-        DataType root = payload;
-        if (schemaInclude) {
-            // when Debezium Kafka connect enables "value.converter.schemas.enable",
-            // the JSON will contain "schema" information and we need to extract data from
-            // "payload".
-            root = DataTypes.ROW(DataTypes.FIELD("payload", payload));
-        }
-
-        // append fields that are required for reading metadata in the root
-        final List<DataTypes.Field> rootMetadataFields =
-                readableMetadata.stream()
-                        .filter(m -> !m.isJsonPayload)
-                        .map(m -> m.requiredJsonField)
-                        .distinct()
-                        .collect(Collectors.toList());
-        root = DataTypeUtils.appendRowFields(root, rootMetadataFields);
-
-        return (RowType) root.getLogicalType();
-    }
-
-    private static MetadataConverter[] createMetadataConverters(
-            RowType jsonRowType, List<ReadableMetadata> requestedMetadata, boolean schemaInclude) {
-        return requestedMetadata.stream()
-                .map(
-                        m -> {
-                            if (m.isJsonPayload) {
-                                return convertInPayload(jsonRowType, m, schemaInclude);
-                            } else {
-                                return convertInRoot(jsonRowType, m);
-                            }
-                        })
-                .toArray(MetadataConverter[]::new);
-    }
-
-    private static MetadataConverter convertInRoot(RowType jsonRowType, ReadableMetadata metadata) {
-        final int pos = findFieldPos(metadata, jsonRowType);
-        return new MetadataConverter() {
-            private static final long serialVersionUID = 1L;
-
-            @Override
-            public Object convert(GenericRowData root, int unused) {
-                return metadata.converter.convert(root, pos);
-            }
-        };
-    }
-
-    private static MetadataConverter convertInPayload(
-            RowType jsonRowType, ReadableMetadata metadata, boolean schemaInclude) {
-        if (schemaInclude) {
-            final int pos = findFieldPos(metadata, (RowType) jsonRowType.getChildren().get(0));
-            return new MetadataConverter() {
-                private static final long serialVersionUID = 1L;
-
-                @Override
-                public Object convert(GenericRowData root, int unused) {
-                    final GenericRowData payload = (GenericRowData) root.getField(0);
-                    return metadata.converter.convert(payload, pos);
-                }
-            };
-        }
-        return convertInRoot(jsonRowType, metadata);
-    }
-
-    private static int findFieldPos(ReadableMetadata metadata, RowType jsonRowType) {
-        return jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName());
-    }
-
-    // --------------------------------------------------------------------------------------------
-
-    /**
-     * Converter that extracts a metadata field from the row (root or payload) that comes out of the
-     * JSON schema and converts it to the desired data type.
-     */
-    interface MetadataConverter extends Serializable {
-
-        // Method for top-level access.
-        default Object convert(GenericRowData row) {
-            return convert(row, -1);
-        }
-
-        Object convert(GenericRowData row, int pos);
-    }
-}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
deleted file mode 100644
index d72fcd2..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.json.debezium;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonFormatOptions;
-import org.apache.flink.formats.json.JsonFormatOptionsUtil;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.DecodingFormat;
-import org.apache.flink.table.connector.format.EncodingFormat;
-import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.DeserializationFormatFactory;
-import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.factories.SerializationFormatFactory;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.RowKind;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
-import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.IGNORE_PARSE_ERRORS;
-import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
-import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_MODE;
-import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.SCHEMA_INCLUDE;
-import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.TIMESTAMP_FORMAT;
-
-/**
- * Format factory for providing configured instances of Debezium JSON to RowData {@link
- * DeserializationSchema}.
- */
-@Internal
-public class DebeziumJsonFormatFactory
-        implements DeserializationFormatFactory, SerializationFormatFactory {
-
-    public static final String IDENTIFIER = "debezium-json";
-
-    @Override
-    public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
-            DynamicTableFactory.Context context, ReadableConfig formatOptions) {
-
-        FactoryUtil.validateFactoryOptions(this, formatOptions);
-        validateDecodingFormatOptions(formatOptions);
-
-        final boolean schemaInclude = formatOptions.get(SCHEMA_INCLUDE);
-
-        final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
-
-        final TimestampFormat timestampFormat =
-                JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
-
-        return new DebeziumJsonDecodingFormat(schemaInclude, ignoreParseErrors, timestampFormat);
-    }
-
-    @Override
-    public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
-            DynamicTableFactory.Context context, ReadableConfig formatOptions) {
-
-        FactoryUtil.validateFactoryOptions(this, formatOptions);
-        validateEncodingFormatOptions(formatOptions);
-
-        TimestampFormat timestampFormat = JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
-        JsonFormatOptions.MapNullKeyMode mapNullKeyMode =
-                JsonFormatOptionsUtil.getMapNullKeyMode(formatOptions);
-        String mapNullKeyLiteral = formatOptions.get(JSON_MAP_NULL_KEY_LITERAL);
-
-        final boolean encodeDecimalAsPlainNumber =
-                formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
-
-        return new EncodingFormat<SerializationSchema<RowData>>() {
-
-            @Override
-            public ChangelogMode getChangelogMode() {
-                return ChangelogMode.newBuilder()
-                        .addContainedKind(RowKind.INSERT)
-                        .addContainedKind(RowKind.UPDATE_BEFORE)
-                        .addContainedKind(RowKind.UPDATE_AFTER)
-                        .addContainedKind(RowKind.DELETE)
-                        .build();
-            }
-
-            @Override
-            public SerializationSchema<RowData> createRuntimeEncoder(
-                    DynamicTableSink.Context context, DataType consumedDataType) {
-                final RowType rowType = (RowType) consumedDataType.getLogicalType();
-                return new DebeziumJsonSerializationSchema(
-                        rowType,
-                        timestampFormat,
-                        mapNullKeyMode,
-                        mapNullKeyLiteral,
-                        encodeDecimalAsPlainNumber);
-            }
-        };
-    }
-
-    @Override
-    public String factoryIdentifier() {
-        return IDENTIFIER;
-    }
-
-    @Override
-    public Set<ConfigOption<?>> requiredOptions() {
-        return Collections.emptySet();
-    }
-
-    @Override
-    public Set<ConfigOption<?>> optionalOptions() {
-        Set<ConfigOption<?>> options = new HashSet<>();
-        options.add(SCHEMA_INCLUDE);
-        options.add(IGNORE_PARSE_ERRORS);
-        options.add(TIMESTAMP_FORMAT);
-        options.add(JSON_MAP_NULL_KEY_MODE);
-        options.add(JSON_MAP_NULL_KEY_LITERAL);
-        options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
-        return options;
-    }
-
-    /** Validator for debezium decoding format. */
-    private static void validateDecodingFormatOptions(ReadableConfig tableOptions) {
-        JsonFormatOptionsUtil.validateDecodingFormatOptions(tableOptions);
-    }
-
-    /** Validator for debezium encoding format. */
-    private static void validateEncodingFormatOptions(ReadableConfig tableOptions) {
-        JsonFormatOptionsUtil.validateEncodingFormatOptions(tableOptions);
-
-        // validator for {@link SCHEMA_INCLUDE}
-        if (tableOptions.get(SCHEMA_INCLUDE)) {
-            throw new ValidationException(
-                    String.format(
-                            "Debezium JSON serialization doesn't support '%s.%s' option been set to true.",
-                            IDENTIFIER, SCHEMA_INCLUDE.key()));
-        }
-    }
-}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatOptions.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatOptions.java
deleted file mode 100644
index bf338a9..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatOptions.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.json.debezium;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.formats.json.JsonFormatOptions;
-
-/** Option utils for debezium-json format. */
-@PublicEvolving
-public class DebeziumJsonFormatOptions {
-
-    public static final ConfigOption<Boolean> SCHEMA_INCLUDE =
-            ConfigOptions.key("schema-include")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription(
-                            "When setting up a Debezium Kafka Connect, users can enable "
-                                    + "a Kafka configuration 'value.converter.schemas.enable' to include schema in the message. "
-                                    + "This option indicates the Debezium JSON data include the schema in the message or not. "
-                                    + "Default is false.");
-
-    public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS =
-            JsonFormatOptions.IGNORE_PARSE_ERRORS;
-
-    public static final ConfigOption<String> TIMESTAMP_FORMAT = JsonFormatOptions.TIMESTAMP_FORMAT;
-
-    public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE =
-            JsonFormatOptions.MAP_NULL_KEY_MODE;
-
-    public static final ConfigOption<String> JSON_MAP_NULL_KEY_LITERAL =
-            JsonFormatOptions.MAP_NULL_KEY_LITERAL;
-
-    private DebeziumJsonFormatOptions() {}
-}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java
deleted file mode 100644
index 0dc9a96..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.json.debezium;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonFormatOptions;
-import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-
-import java.util.Objects;
-
-import static java.lang.String.format;
-import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
-
-/**
- * Serialization schema from Flink Table/SQL internal data structure {@link RowData} to Debezium
- * JSON.
- *
- * @see <a href="https://debezium.io/">Debezium</a>
- */
-public class DebeziumJsonSerializationSchema implements SerializationSchema<RowData> {
-    private static final long serialVersionUID = 1L;
-
-    private static final StringData OP_INSERT = StringData.fromString("c"); // insert
-    private static final StringData OP_DELETE = StringData.fromString("d"); // delete
-
-    /** The serializer to serialize Debezium JSON data. * */
-    private final JsonRowDataSerializationSchema jsonSerializer;
-
-    private transient GenericRowData genericRowData;
-
-    public DebeziumJsonSerializationSchema(
-            RowType rowType,
-            TimestampFormat timestampFormat,
-            JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
-            String mapNullKeyLiteral,
-            boolean encodeDecimalAsPlainNumber) {
-        jsonSerializer =
-                new JsonRowDataSerializationSchema(
-                        createJsonRowType(fromLogicalToDataType(rowType)),
-                        timestampFormat,
-                        mapNullKeyMode,
-                        mapNullKeyLiteral,
-                        encodeDecimalAsPlainNumber);
-    }
-
-    @Override
-    public void open(InitializationContext context) throws Exception {
-        jsonSerializer.open(context);
-        genericRowData = new GenericRowData(3);
-    }
-
-    @Override
-    public byte[] serialize(RowData rowData) {
-        try {
-            switch (rowData.getRowKind()) {
-                case INSERT:
-                case UPDATE_AFTER:
-                    genericRowData.setField(0, null);
-                    genericRowData.setField(1, rowData);
-                    genericRowData.setField(2, OP_INSERT);
-                    return jsonSerializer.serialize(genericRowData);
-                case UPDATE_BEFORE:
-                case DELETE:
-                    genericRowData.setField(0, rowData);
-                    genericRowData.setField(1, null);
-                    genericRowData.setField(2, OP_DELETE);
-                    return jsonSerializer.serialize(genericRowData);
-                default:
-                    throw new UnsupportedOperationException(
-                            format(
-                                    "Unsupported operation '%s' for row kind.",
-                                    rowData.getRowKind()));
-            }
-        } catch (Throwable t) {
-            throw new RuntimeException(format("Could not serialize row '%s'.", rowData), t);
-        }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        DebeziumJsonSerializationSchema that = (DebeziumJsonSerializationSchema) o;
-        return Objects.equals(jsonSerializer, that.jsonSerializer);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(jsonSerializer);
-    }
-
-    private static RowType createJsonRowType(DataType databaseSchema) {
-        // Debezium JSON contains some other information, e.g. "source", "ts_ms"
-        // but we don't need them.
-        return (RowType)
-                DataTypes.ROW(
-                                DataTypes.FIELD("before", databaseSchema),
-                                DataTypes.FIELD("after", databaseSchema),
-                                DataTypes.FIELD("op", DataTypes.STRING()))
-                        .getLogicalType();
-    }
-}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-formats-kafka/flink-json-debezium/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
deleted file mode 100644
index 3b83658..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
deleted file mode 100644
index ad61f21..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.architecture;
-
-import org.apache.flink.architecture.common.ImportOptions;
-
-import com.tngtech.archunit.core.importer.ImportOption;
-import com.tngtech.archunit.junit.AnalyzeClasses;
-import com.tngtech.archunit.junit.ArchTest;
-import com.tngtech.archunit.junit.ArchTests;
-
-/** Architecture tests for test code. */
-@AnalyzeClasses(
-        packages = {"org.apache.flink.formats.json"},
-        importOptions = {
-            ImportOption.OnlyIncludeTests.class,
-            ImportOptions.ExcludeScalaImportOption.class,
-            ImportOptions.ExcludeShadedImportOption.class
-        })
-public class TestCodeArchitectureTest {
-
-    @ArchTest
-    public static final ArchTests COMMON_TESTS = ArchTests.in(TestCodeArchitectureTestBase.class);
-}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFileSystemITCase.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFileSystemITCase.java
deleted file mode 100644
index 0c84351..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFileSystemITCase.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.json.debezium;
-
-import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
-import org.apache.flink.util.CollectionUtil;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import static java.lang.String.format;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test Filesystem connector with DebeziumJson. */
-class DebeziumJsonFileSystemITCase extends StreamingTestBase {
-
-    private static final List<String> EXPECTED =
-            Arrays.asList(
-                    "+I[101, SCOOTER, Small 2-wheel scooter, 3.14]",
-                    "+I[102, CAR BATTERY, 12V car battery, 8.1]",
-                    "+I[103, 12-PACK DRILL BITS, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]",
-                    "+I[104, HAMMER, 12oz carpenter's hammer, 0.75]",
-                    "+I[105, HAMMER, 14oz carpenter's hammer, 0.875]",
-                    "+I[106, HAMMER, 16oz carpenter's hammer, 1.0]",
-                    "+I[107, ROCKS, box of assorted rocks, 5.3]",
-                    "+I[108, JACKET, water resistent black wind breaker, 0.1]",
-                    "+I[109, SPARE TIRE, 24 inch spare tire, 22.2]",
-                    "-D[106, HAMMER, 16oz carpenter's hammer, 1.0]", // -U
-                    "+I[106, HAMMER, 18oz carpenter hammer, 1.0]", // +U
-                    "-D[107, ROCKS, box of assorted rocks, 5.3]", // -U
-                    "+I[107, ROCKS, box of assorted rocks, 5.1]", // +U
-                    "+I[110, JACKET, water resistent white wind breaker, 0.2]",
-                    "+I[111, SCOOTER, Big 2-wheel scooter , 5.18]",
-                    "-D[110, JACKET, water resistent white wind breaker, 0.2]", // -U
-                    "+I[110, JACKET, new water resistent white wind breaker, 0.5]", // +U
-                    "-D[111, SCOOTER, Big 2-wheel scooter , 5.18]", // -U
-                    "+I[111, SCOOTER, Big 2-wheel scooter , 5.17]", // +U
-                    "-D[111, SCOOTER, Big 2-wheel scooter , 5.17]");
-
-    private File source;
-    private File sink;
-
-    private void prepareTables(boolean isPartition, Path tempSourceDir, Path tempSinkDir)
-            throws IOException {
-        byte[] bytes = readBytes("debezium-data-schema-exclude.txt");
-        source = tempSourceDir.toFile();
-        File file;
-        if (isPartition) {
-            File partition = new File(source, "p=1");
-            partition.mkdirs();
-            file = new File(partition, "my_file");
-        } else {
-            file = new File(source, "my_file");
-        }
-        file.createNewFile();
-        Files.write(file.toPath(), bytes);
-
-        sink = tempSinkDir.toFile();
-
-        env().setParallelism(1);
-    }
-
-    private void createTable(boolean isSink, String path, boolean isPartition) {
-        tEnv().executeSql(
-                        format("create table %s (", isSink ? "sink" : "source")
-                                + "id int, name string,"
-                                + (isSink ? "upper_name string," : "")
-                                + " description string, weight float"
-                                + (isPartition ? ", p int) partitioned by (p) " : ")")
-                                + " with ("
-                                + "'connector'='filesystem',"
-                                + "'format'='debezium-json',"
-                                + format("'path'='%s'", path)
-                                + ")");
-    }
-
-    @Test
-    void testNonPartition(@TempDir Path tempSourceDir, @TempDir Path tempSinkDir) throws Exception {
-        prepareTables(false, tempSourceDir, tempSinkDir);
-        createTable(false, source.toURI().toString(), false);
-        createTable(true, sink.toURI().toString(), false);
-
-        tEnv().executeSql(
-                        "insert into sink select id,name,UPPER(name),description,weight from source")
-                .await();
-        CloseableIterator<Row> iter =
-                tEnv().executeSql("select id,upper_name,description,weight from sink").collect();
-
-        List<String> results =
-                CollectionUtil.iteratorToList(iter).stream()
-                        .map(Row::toString)
-                        .collect(Collectors.toList());
-        iter.close();
-
-        assertThat(results).isEqualTo(EXPECTED);
-    }
-
-    @Test
-    void testPartition(@TempDir Path tempSourceDir, @TempDir Path tempSinkDir) throws Exception {
-        prepareTables(true, tempSourceDir, tempSinkDir);
-        createTable(false, source.toURI().toString(), true);
-        createTable(true, sink.toURI().toString(), true);
-
-        tEnv().executeSql(
-                        "insert into sink select id,name,UPPER(name),description,weight,p from source")
-                .await();
-        CloseableIterator<Row> iter =
-                tEnv().executeSql("select id,upper_name,description,weight,p from sink").collect();
-        List<Row> list = CollectionUtil.iteratorToList(iter);
-        iter.close();
-
-        List<String> results =
-                list.stream()
-                        .map(row -> Row.project(row, new int[] {0, 1, 2, 3}))
-                        .map(Row::toString)
-                        .collect(Collectors.toList());
-
-        assertThat(results).isEqualTo(EXPECTED);
-
-        // check partition value
-        for (Row row : list) {
-            assertThat(row.getField(4)).isEqualTo(1);
-        }
-    }
-
-    private static byte[] readBytes(String resource) throws IOException {
-        final URL url = DebeziumJsonSerDeSchemaTest.class.getClassLoader().getResource(resource);
-        assert url != null;
-        Path path = new File(url.getFile()).toPath();
-        return Files.readAllBytes(path);
-    }
-}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
deleted file mode 100644
index d000877..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.json.debezium;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonFormatOptions;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.TestDynamicTableFactory;
-import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
-import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.types.logical.RowType;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.Consumer;
-
-import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
-import static org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_DATA_TYPE;
-import static org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_TYPE;
-import static org.apache.flink.table.factories.utils.FactoryMocks.SCHEMA;
-import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
-import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** Tests for {@link DebeziumJsonFormatFactory}. */
-class DebeziumJsonFormatFactoryTest {
-
-    @Test
-    void testSeDeSchema() {
-        final DebeziumJsonDeserializationSchema expectedDeser =
-                new DebeziumJsonDeserializationSchema(
-                        PHYSICAL_DATA_TYPE,
-                        Collections.emptyList(),
-                        InternalTypeInfo.of(PHYSICAL_TYPE),
-                        false,
-                        true,
-                        TimestampFormat.ISO_8601);
-
-        final Map<String, String> options = getAllOptions();
-
-        final DynamicTableSource actualSource = createTableSource(SCHEMA, options);
-        assert actualSource instanceof TestDynamicTableFactory.DynamicTableSourceMock;
-        TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
-                (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
-
-        DeserializationSchema<RowData> actualDeser =
-                scanSourceMock.valueFormat.createRuntimeDecoder(
-                        ScanRuntimeProviderContext.INSTANCE, PHYSICAL_DATA_TYPE);
-
-        assertThat(actualDeser).isEqualTo(expectedDeser);
-
-        final DebeziumJsonSerializationSchema expectedSer =
-                new DebeziumJsonSerializationSchema(
-                        (RowType) PHYSICAL_DATA_TYPE.getLogicalType(),
-                        TimestampFormat.ISO_8601,
-                        JsonFormatOptions.MapNullKeyMode.LITERAL,
-                        "null",
-                        true);
-
-        final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
-        assert actualSink instanceof TestDynamicTableFactory.DynamicTableSinkMock;
-        TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
-                (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
-
-        SerializationSchema<RowData> actualSer =
-                sinkMock.valueFormat.createRuntimeEncoder(
-                        new SinkRuntimeProviderContext(false), PHYSICAL_DATA_TYPE);
-
-        assertThat(actualSer).isEqualTo(expectedSer);
-    }
-
-    @Test
-    void testInvalidIgnoreParseError() {
-        final Map<String, String> options =
-                getModifiedOptions(opts -> opts.put("debezium-json.ignore-parse-errors", "abc"));
-
-        assertThatThrownBy(() -> createTableSource(SCHEMA, options))
-                .satisfies(
-                        anyCauseMatches(
-                                IllegalArgumentException.class,
-                                "Unrecognized option for boolean: abc. "
-                                        + "Expected either true or false(case insensitive)"));
-    }
-
-    @Test
-    void testSchemaIncludeOption() {
-        Map<String, String> options = getAllOptions();
-        options.put("debezium-json.schema-include", "true");
-
-        final DebeziumJsonDeserializationSchema expectedDeser =
-                new DebeziumJsonDeserializationSchema(
-                        PHYSICAL_DATA_TYPE,
-                        Collections.emptyList(),
-                        InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()),
-                        true,
-                        true,
-                        TimestampFormat.ISO_8601);
-        final DynamicTableSource actualSource = createTableSource(SCHEMA, options);
-        TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
-                (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
-        DeserializationSchema<RowData> actualDeser =
-                scanSourceMock.valueFormat.createRuntimeDecoder(
-                        ScanRuntimeProviderContext.INSTANCE, PHYSICAL_DATA_TYPE);
-        assertThat(actualDeser).isEqualTo(expectedDeser);
-
-        assertThatThrownBy(
-                        () -> {
-                            final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
-                            TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
-                                    (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
-                            sinkMock.valueFormat.createRuntimeEncoder(
-                                    new SinkRuntimeProviderContext(false), PHYSICAL_DATA_TYPE);
-                        })
-                .satisfies(
-                        anyCauseMatches(
-                                RuntimeException.class,
-                                "Debezium JSON serialization doesn't support "
-                                        + "'debezium-json.schema-include' option been set to true."));
-    }
-
-    @Test
-    void testInvalidOptionForTimestampFormat() {
-        final Map<String, String> tableOptions =
-                getModifiedOptions(
-                        opts -> opts.put("debezium-json.timestamp-format.standard", "test"));
-
-        assertThatThrownBy(() -> createTableSource(SCHEMA, tableOptions))
-                .isInstanceOf(ValidationException.class)
-                .satisfies(
-                        anyCauseMatches(
-                                ValidationException.class,
-                                "Unsupported value 'test' for timestamp-format.standard. "
-                                        + "Supported values are [SQL, ISO-8601]."));
-    }
-
-    @Test
-    void testInvalidOptionForMapNullKeyMode() {
-        final Map<String, String> tableOptions =
-                getModifiedOptions(opts -> opts.put("debezium-json.map-null-key.mode", "invalid"));
-
-        assertThatThrownBy(() -> createTableSink(SCHEMA, tableOptions))
-                .isInstanceOf(ValidationException.class)
-                .satisfies(
-                        anyCauseMatches(
-                                ValidationException.class,
-                                "Unsupported value 'invalid' for option map-null-key.mode. "
-                                        + "Supported values are [LITERAL, FAIL, DROP]."));
-    }
-
-    // ------------------------------------------------------------------------
-    //  Utilities
-    // ------------------------------------------------------------------------
-
-    /**
-     * Returns the full options modified by the given consumer {@code optionModifier}.
-     *
-     * @param optionModifier Consumer to modify the options
-     */
-    private Map<String, String> getModifiedOptions(Consumer<Map<String, String>> optionModifier) {
-        Map<String, String> options = getAllOptions();
-        optionModifier.accept(options);
-        return options;
-    }
-
-    private Map<String, String> getAllOptions() {
-        final Map<String, String> options = new HashMap<>();
-        options.put("connector", TestDynamicTableFactory.IDENTIFIER);
-        options.put("target", "MyTarget");
-        options.put("buffer-size", "1000");
-
-        options.put("format", "debezium-json");
-        options.put("debezium-json.ignore-parse-errors", "true");
-        options.put("debezium-json.timestamp-format.standard", "ISO-8601");
-        options.put("debezium-json.map-null-key.mode", "LITERAL");
-        options.put("debezium-json.map-null-key.literal", "null");
-        options.put("debezium-json.encode.decimal-as-plain-number", "true");
-        return options;
-    }
-}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
deleted file mode 100644
index 3b9151f..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.json.debezium;
-
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonFormatOptions;
-import org.apache.flink.formats.json.debezium.DebeziumJsonDecodingFormat.ReadableMetadata;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.DataTypeUtils;
-import org.apache.flink.util.Collector;
-
-import org.junit.jupiter.api.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.connector.testutils.formats.SchemaTestUtils.open;
-import static org.apache.flink.table.api.DataTypes.FIELD;
-import static org.apache.flink.table.api.DataTypes.FLOAT;
-import static org.apache.flink.table.api.DataTypes.INT;
-import static org.apache.flink.table.api.DataTypes.ROW;
-import static org.apache.flink.table.api.DataTypes.STRING;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/**
- * Tests for {@link DebeziumJsonSerializationSchema} and {@link DebeziumJsonDeserializationSchema}.
- */
-class DebeziumJsonSerDeSchemaTest {
-
-    private static final DataType PHYSICAL_DATA_TYPE =
-            ROW(
-                    FIELD("id", INT().notNull()),
-                    FIELD("name", STRING()),
-                    FIELD("description", STRING()),
-                    FIELD("weight", FLOAT()));
-
-    @Test
-    void testSerializationAndSchemaIncludeDeserialization() throws Exception {
-        testSerializationDeserialization("debezium-data-schema-include.txt", true);
-    }
-
-    @Test
-    void testSerializationAndSchemaExcludeDeserialization() throws Exception {
-        testSerializationDeserialization("debezium-data-schema-exclude.txt", false);
-    }
-
-    @Test
-    void testSerializationAndPostgresSchemaIncludeDeserialization() throws Exception {
-        testSerializationDeserialization("debezium-postgres-data-schema-include.txt", true);
-    }
-
-    @Test
-    void testSerializationAndPostgresSchemaExcludeDeserialization() throws Exception {
-        testSerializationDeserialization("debezium-postgres-data-schema-exclude.txt", false);
-    }
-
-    @Test
-    void testPostgresDefaultReplicaIdentify() {
-        assertThatThrownBy(
-                        () ->
-                                testSerializationDeserialization(
-                                        "debezium-postgres-data-replica-identity.txt", false))
-                .as(
-                        "The \"before\" field of UPDATE message is null, if you are using Debezium Postgres Connector, "
-                                + "please check the Postgres table has been set REPLICA IDENTITY to FULL level.")
-                .isInstanceOf(Exception.class);
-    }
-
-    @Test
-    void testTombstoneMessages() throws Exception {
-        DebeziumJsonDeserializationSchema deserializationSchema =
-                new DebeziumJsonDeserializationSchema(
-                        PHYSICAL_DATA_TYPE,
-                        Collections.emptyList(),
-                        InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()),
-                        false,
-                        false,
-                        TimestampFormat.ISO_8601);
-        SimpleCollector collector = new SimpleCollector();
-        deserializationSchema.deserialize(null, collector);
-        deserializationSchema.deserialize(new byte[] {}, collector);
-        assertThat(collector.list).isEmpty();
-    }
-
-    @Test
-    void testDeserializationWithMetadata() throws Exception {
-        testDeserializationWithMetadata(
-                "debezium-data-schema-include.txt",
-                true,
-                row -> {
-                    assertThat(row.getInt(0)).isEqualTo(101);
-                    assertThat(row.getString(1).toString()).isEqualTo("scooter");
-                    assertThat(row.getString(2).toString()).isEqualTo("Small 2-wheel scooter");
-                    assertThat(row.getFloat(3)).isEqualTo(3.14f);
-                    assertThat(row.getString(4).toString())
-                            .startsWith(
-                                    "{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},");
-                    assertThat(row.getTimestamp(5, 3).getMillisecond()).isEqualTo(1589355606100L);
-                    assertThat(row.getTimestamp(6, 3).getMillisecond()).isEqualTo(0L);
-                    assertThat(row.getString(7).toString()).isEqualTo("inventory");
-                    assertThat(row.isNullAt(8)).isEqualTo(true);
-                    assertThat(row.getString(9).toString()).isEqualTo("products");
-                    assertThat(row.getMap(10).size()).isEqualTo(14);
-                });
-
-        testDeserializationWithMetadata(
-                "debezium-data-schema-exclude.txt",
-                false,
-                row -> {
-                    assertThat(row.getInt(0)).isEqualTo(101);
-                    assertThat(row.getString(1).toString()).isEqualTo("scooter");
-                    assertThat(row.getString(2).toString()).isEqualTo("Small 2-wheel scooter");
-                    assertThat(row.getFloat(3)).isEqualTo(3.14f);
-                    assertThat(row.isNullAt(4)).isEqualTo(true);
-                    assertThat(row.getTimestamp(5, 3).getMillisecond()).isEqualTo(1589355606100L);
-                    assertThat(row.getTimestamp(6, 3).getMillisecond()).isEqualTo(0L);
-                    assertThat(row.getString(7).toString()).isEqualTo("inventory");
-                    assertThat(row.isNullAt(8)).isEqualTo(true);
-                    assertThat(row.getString(9).toString()).isEqualTo("products");
-                    assertThat(row.getMap(10).size()).isEqualTo(14);
-                });
-
-        testDeserializationWithMetadata(
-                "debezium-postgres-data-schema-exclude.txt",
-                false,
-                row -> {
-                    assertThat(row.getInt(0)).isEqualTo(101);
-                    assertThat(row.getString(1).toString()).isEqualTo("scooter");
-                    assertThat(row.getString(2).toString()).isEqualTo("Small 2-wheel scooter");
-                    assertThat(row.getFloat(3)).isEqualTo(3.14f);
-                    assertThat(row.isNullAt(4)).isEqualTo(true);
-                    assertThat(row.getTimestamp(5, 3).getMillisecond()).isEqualTo(1596001099434L);
-                    assertThat(row.getTimestamp(6, 3).getMillisecond()).isEqualTo(1596001099434L);
-                    assertThat(row.getString(7).toString()).isEqualTo("postgres");
-                    assertThat(row.getString(8).toString()).isEqualTo("inventory");
-                    assertThat(row.getString(9).toString()).isEqualTo("products");
-                    assertThat(row.getMap(10).size()).isEqualTo(11);
-                });
-    }
-
-    private void testSerializationDeserialization(String resourceFile, boolean schemaInclude)
-            throws Exception {
-        List<String> lines = readLines(resourceFile);
-        DebeziumJsonDeserializationSchema deserializationSchema =
-                new DebeziumJsonDeserializationSchema(
-                        PHYSICAL_DATA_TYPE,
-                        Collections.emptyList(),
-                        InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()),
-                        schemaInclude,
-                        false,
-                        TimestampFormat.ISO_8601);
-        open(deserializationSchema);
-
-        SimpleCollector collector = new SimpleCollector();
-        for (String line : lines) {
-            deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8), collector);
-        }
-
-        // Debezium captures change data (`debezium-data-schema-include.txt`) on the `product`
-        // table:
-        //
-        // CREATE TABLE product (
-        //  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
-        //  name VARCHAR(255),
-        //  description VARCHAR(512),
-        //  weight FLOAT
-        // );
-        // ALTER TABLE product AUTO_INCREMENT = 101;
-        //
-        // INSERT INTO product
-        // VALUES (default,"scooter","Small 2-wheel scooter",3.14),
-        //        (default,"car battery","12V car battery",8.1),
-        //        (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40
-        // to #3",0.8),
-        //        (default,"hammer","12oz carpenter's hammer",0.75),
-        //        (default,"hammer","14oz carpenter's hammer",0.875),
-        //        (default,"hammer","16oz carpenter's hammer",1.0),
-        //        (default,"rocks","box of assorted rocks",5.3),
-        //        (default,"jacket","water resistent black wind breaker",0.1),
-        //        (default,"spare tire","24 inch spare tire",22.2);
-        // UPDATE product SET description='18oz carpenter hammer' WHERE id=106;
-        // UPDATE product SET weight='5.1' WHERE id=107;
-        // INSERT INTO product VALUES (default,"jacket","water resistent white wind breaker",0.2);
-        // INSERT INTO product VALUES (default,"scooter","Big 2-wheel scooter ",5.18);
-        // UPDATE product SET description='new water resistent white wind breaker', weight='0.5'
-        // WHERE id=110;
-        // UPDATE product SET weight='5.17' WHERE id=111;
-        // DELETE FROM product WHERE id=111;
-        List<String> expected =
-                Arrays.asList(
-                        "+I(101,scooter,Small 2-wheel scooter,3.14)",
-                        "+I(102,car battery,12V car battery,8.1)",
-                        "+I(103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8)",
-                        "+I(104,hammer,12oz carpenter's hammer,0.75)",
-                        "+I(105,hammer,14oz carpenter's hammer,0.875)",
-                        "+I(106,hammer,16oz carpenter's hammer,1.0)",
-                        "+I(107,rocks,box of assorted rocks,5.3)",
-                        "+I(108,jacket,water resistent black wind breaker,0.1)",
-                        "+I(109,spare tire,24 inch spare tire,22.2)",
-                        "-U(106,hammer,16oz carpenter's hammer,1.0)",
-                        "+U(106,hammer,18oz carpenter hammer,1.0)",
-                        "-U(107,rocks,box of assorted rocks,5.3)",
-                        "+U(107,rocks,box of assorted rocks,5.1)",
-                        "+I(110,jacket,water resistent white wind breaker,0.2)",
-                        "+I(111,scooter,Big 2-wheel scooter ,5.18)",
-                        "-U(110,jacket,water resistent white wind breaker,0.2)",
-                        "+U(110,jacket,new water resistent white wind breaker,0.5)",
-                        "-U(111,scooter,Big 2-wheel scooter ,5.18)",
-                        "+U(111,scooter,Big 2-wheel scooter ,5.17)",
-                        "-D(111,scooter,Big 2-wheel scooter ,5.17)");
-        List<String> actual =
-                collector.list.stream().map(Object::toString).collect(Collectors.toList());
-        assertThat(actual).isEqualTo(expected);
-
-        DebeziumJsonSerializationSchema serializationSchema =
-                new DebeziumJsonSerializationSchema(
-                        (RowType) PHYSICAL_DATA_TYPE.getLogicalType(),
-                        TimestampFormat.SQL,
-                        JsonFormatOptions.MapNullKeyMode.LITERAL,
-                        "null",
-                        true);
-
-        open(serializationSchema);
-        actual = new ArrayList<>();
-        for (RowData rowData : collector.list) {
-            actual.add(new String(serializationSchema.serialize(rowData), StandardCharsets.UTF_8));
-        }
-
-        expected =
-                Arrays.asList(
-                        "{\"before\":null,\"after\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":3.14},\"op\":\"c\"}",
-                        "{\"before\":null,\"after\":{\"id\":102,\"name\":\"car battery\",\"description\":\"12V car battery\",\"weight\":8.1},\"op\":\"c\"}",
-                        "{\"before\":null,\"after\":{\"id\":103,\"name\":\"12-pack drill bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\"weight\":0.8},\"op\":\"c\"}",
-                        "{\"before\":null,\"after\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's hammer\",\"weight\":0.75},\"op\":\"c\"}",
-                        "{\"before\":null,\"after\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's hammer\",\"weight\":0.875},\"op\":\"c\"}",
-                        "{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0},\"op\":\"c\"}",
-                        "{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3},\"op\":\"c\"}",
-                        "{\"before\":null,\"after\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent black wind breaker\",\"weight\":0.1},\"op\":\"c\"}",
-                        "{\"before\":null,\"after\":{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":22.2},\"op\":\"c\"}",
-                        "{\"before\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0},\"after\":null,\"op\":\"d\"}",
-                        "{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter hammer\",\"weight\":1.0},\"op\":\"c\"}",
-                        "{\"before\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3},\"after\":null,\"op\":\"d\"}",
-                        "{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.1},\"op\":\"c\"}",
-                        "{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent white wind breaker\",\"weight\":0.2},\"op\":\"c\"}",
-                        "{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.18},\"op\":\"c\"}",
-                        "{\"before\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent white wind breaker\",\"weight\":0.2},\"after\":null,\"op\":\"d\"}",
-                        "{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water resistent white wind breaker\",\"weight\":0.5},\"op\":\"c\"}",
-                        "{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.18},\"after\":null,\"op\":\"d\"}",
-                        "{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"op\":\"c\"}",
-                        "{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"after\":null,\"op\":\"d\"}");
-        assertThat(actual).isEqualTo(expected);
-    }
-
-    private void testDeserializationWithMetadata(
-            String resourceFile, boolean schemaInclude, Consumer<RowData> testConsumer)
-            throws Exception {
-        // we only read the first line for keeping the test simple
-        final String firstLine = readLines(resourceFile).get(0);
-
-        final List<ReadableMetadata> requestedMetadata = Arrays.asList(ReadableMetadata.values());
-
-        final DataType producedDataType =
-                DataTypeUtils.appendRowFields(
-                        PHYSICAL_DATA_TYPE,
-                        requestedMetadata.stream()
-                                .map(m -> DataTypes.FIELD(m.key, m.dataType))
-                                .collect(Collectors.toList()));
-
-        final DebeziumJsonDeserializationSchema deserializationSchema =
-                new DebeziumJsonDeserializationSchema(
-                        PHYSICAL_DATA_TYPE,
-                        requestedMetadata,
-                        InternalTypeInfo.of(producedDataType.getLogicalType()),
-                        schemaInclude,
-                        false,
-                        TimestampFormat.ISO_8601);
-        open(deserializationSchema);
-
-        final SimpleCollector collector = new SimpleCollector();
-        deserializationSchema.deserialize(firstLine.getBytes(StandardCharsets.UTF_8), collector);
-
-        assertThat(collector.list).hasSize(1);
-        assertThat(collector.list.get(0)).satisfies(testConsumer);
-    }
-
-    // --------------------------------------------------------------------------------------------
-    // Utilities
-    // --------------------------------------------------------------------------------------------
-
-    private static List<String> readLines(String resource) throws IOException {
-        final URL url = DebeziumJsonSerDeSchemaTest.class.getClassLoader().getResource(resource);
-        assert url != null;
-        Path path = new File(url.getFile()).toPath();
-        return Files.readAllLines(path);
-    }
-
-    private static class SimpleCollector implements Collector<RowData> {
-
-        private List<RowData> list = new ArrayList<>();
-
-        @Override
-        public void collect(RowData record) {
-            list.add(record);
-        }
-
-        @Override
-        public void close() {
-            // do nothing
-        }
-    }
-}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/flink-formats-kafka/flink-json-debezium/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
deleted file mode 100644
index 2899913..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.util.TestLoggerExtension
\ No newline at end of file
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/archunit.properties b/flink-formats-kafka/flink-json-debezium/src/test/resources/archunit.properties
deleted file mode 100644
index 15be88c..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/archunit.properties
+++ /dev/null
@@ -1,31 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# By default we allow removing existing violations, but fail when new violations are added.
-freeze.store.default.allowStoreUpdate=true
-
-# Enable this if a new (frozen) rule has been added in order to create the initial store and record the existing violations.
-#freeze.store.default.allowStoreCreation=true
-
-# Enable this to add allow new violations to be recorded.
-# NOTE: Adding new violations should be avoided when possible. If the rule was correct to flag a new
-#       violation, please try to avoid creating the violation. If the violation was created due to a
-#       shortcoming of the rule, file a JIRA issue so the rule can be improved.
-#freeze.refreeze=true
-
-freeze.store.default.path=archunit-violations
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/canal-data-filter-table.txt b/flink-formats-kafka/flink-json-debezium/src/test/resources/canal-data-filter-table.txt
deleted file mode 100644
index 68661d7..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/canal-data-filter-table.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-{"data":[{"id":"101","name":"scooter","description":"Small 2-wheel scooter","weight":"3.14"},{"id":"102","name":"car battery","description":"12V car battery","weight":"8.1"},{"id":"103","name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":"0.8"},{"id":"104","name":"hammer","description":"12oz carpenter's hammer","weight":"0.75"},{"id":"105","name":"hammer","description":"14oz carpenter's hammer","weight":"0.875"},{"id":"106","name":"hammer","description":null,"weight":"1.0"},{"id":"107","name":"rocks","description":"box of assorted rocks","weight":"5.3"},{"id":"108","name":"jacket","description":"water resistent black wind breaker","weight":"0.1"},{"id":"109","name":"spare tire","description":"24 inch spare tire","weight":"22.2"}],"database":"mydb","es":1598944132000,"id":1,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944146308,"type":"INSERT"}
-{"data":[{"id":"106","name":"hammer","description":"18oz carpenter hammer","weight":"1.0"}],"database":"mydb","es":1598944202000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":[{"description":null}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944202218,"type":"UPDATE"}
-{"data":null,"database":"mydb","es":1598944271000,"id":3,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE orders (\n  order_number INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,\n  order_date DATE NOT NULL,\n  purchaser INTEGER NOT NULL,\n  quantity INTEGER NOT NULL,\n  product_id INTEGER NOT NULL\n) AUTO_INCREMENT = 10001","sqlType":null,"table":"orders","ts":1598944271192,"type":"CREATE"}
-{"data":[{"order_number":"10001","order_date":"2016-01-16","purchaser":"1001","quantity":"1","product_id":"102"},{"order_number":"10002","order_date":"2016-01-17","purchaser":"1002","quantity":"2","product_id":"105"},{"order_number":"10003","order_date":"2016-02-19","purchaser":"1002","quantity":"2","product_id":"106"},{"order_number":"10004","order_date":"2016-02-21","purchaser":"1003","quantity":"1","product_id":"107"}],"database":"mydb","es":1598944275000,"id":4,"isDdl":false,"mysqlType":{"order_number":"INTEGER","order_date":"DATE","purchaser":"INTEGER","quantity":"INTEGER","product_id":"INTEGER"},"old":null,"pkNames":["order_number"],"sql":"","sqlType":{"order_number":4,"order_date":91,"purchaser":4,"quantity":4,"product_id":4},"table":"orders","ts":1598944275018,"type":"INSERT"}
-{"data":[{"id":"107","name":"rocks","description":"box of assorted rocks","weight":"5.1"}],"database":"mydb","es":1598944279000,"id":5,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":[{"weight":"5.3"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944279665,"type":"UPDATE"}
-{"data":[{"id":"110","name":"jacket","description":"water resistent white wind breaker","weight":"0.2"}],"database":"mydb","es":1598944288000,"id":6,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944288394,"type":"INSERT"}
-{"data":[{"id":"111","name":"scooter","description":"Big 2-wheel scooter ","weight":"5.18"}],"database":"mydb","es":1598944288000,"id":6,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944288394,"type":"INSERT"}
-{"data":[{"id":"110","name":"jacket","description":"new water resistent white wind breaker","weight":"0.5"}],"database":"mydb","es":1598944288000,"id":7,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":[{"description":"water resistent white wind breaker","weight":"0.2"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944288717,"type":"UPDATE"}
-{"data":[{"order_number":"10001","order_date":"2016-01-16","purchaser":"1001","quantity":"3","product_id":"102"}],"database":"mydb","es":1598944331000,"id":8,"isDdl":false,"mysqlType":{"order_number":"INTEGER","order_date":"DATE","purchaser":"INTEGER","quantity":"INTEGER","product_id":"INTEGER"},"old":[{"quantity":"1"}],"pkNames":["order_number"],"sql":"","sqlType":{"order_number":4,"order_date":91,"purchaser":4,"quantity":4,"product_id":4},"table":"orders","ts":1598944331870,"type":"UPDATE"}
-{"data":[{"id":"111","name":"scooter","description":"Big 2-wheel scooter ","weight":"5.17"}],"database":"mydb","es":1598944337000,"id":9,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":[{"weight":"5.18"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944337341,"type":"UPDATE"}
-{"data":[{"id":"111","name":"scooter","description":"Big 2-wheel scooter ","weight":"5.17"}],"database":"mydb","es":1598944337000,"id":9,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944337341,"type":"DELETE"}
-{"data":[{"id":"101","name":"scooter","description":"Small 2-wheel scooter","weight":"5.17"},{"id":"102","name":"car battery","description":"12V car battery","weight":"5.17"}],"database":"mydb","es":1598944337000,"id":10,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":[{"weight":"3.14"},{"weight":"8.1"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944337663,"type":"UPDATE"}
-{"data":[{"order_number":"10002","order_date":"2016-01-17","purchaser":"1002","quantity":"2","product_id":"105"}],"database":"mydb","es":1598944374000,"id":11,"isDdl":false,"mysqlType":{"order_number":"INTEGER","order_date":"DATE","purchaser":"INTEGER","quantity":"INTEGER","product_id":"INTEGER"},"old":null,"pkNames":["order_number"],"sql":"","sqlType":{"order_number":4,"order_date":91,"purchaser":4,"quantity":4,"product_id":4},"table":"orders","ts":1598944374999,"type":"DELETE"}
-{"data":[{"id":"102","name":"car battery","description":"12V car battery","weight":"5.17"},{"id":"103","name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":"0.8"}],"database":"mydb","es":1598944418000,"id":12,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944418418,"type":"DELETE"}
-{"data":null,"database":"mydb","es":1598944271000,"id":13,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE project (\n  id VARCHAR(255) NOT NULL,\n  name VARCHAR(255) NOT NULL,\n  description VARCHAR(255) NOT NULL,\n  weight FLOAT NOT NULL\n)","sqlType":null,"table":"projects","ts":1598944271192,"type":"CREATE"}
-{"data":[{"id":"A101","name":"scooter","description":"Small 2-wheel scooter","weight":"3.14"},{"id":"A102","name":"car battery","description":"12V car battery","weight":"8.1"},{"id":"A103","name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":"0.8"},{"id":"A104","name":"hammer","description":"12oz carpenter's hammer","weight":"0.75"},{"id":"A105","name":"hammer","description":"14oz carpenter's hammer","weight":"0.875"},{"id":"A106","name":"hammer","description":"16oz carpenter's hammer","weight":"1.0"},{"id":"A107","name":"rocks","description":"box of assorted rocks","weight":"5.3"},{"id":"A108","name":"jacket","description":"water resistent black wind breaker","weight":"0.1"},{"id":"A109","name":"spare tire","description":"24 inch spare tire","weight":"22.2"}],"database":"mydb","es":1598944132000,"id":14,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"project","ts":1598944146308,"type":"INSERT"}
\ No newline at end of file
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/canal-data.txt b/flink-formats-kafka/flink-json-debezium/src/test/resources/canal-data.txt
deleted file mode 100644
index a83b7da..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/canal-data.txt
+++ /dev/null
@@ -1,11 +0,0 @@
-{"data":[{"id":"101","name":"scooter","description":"Small 2-wheel scooter","weight":"3.14"},{"id":"102","name":"car battery","description":"12V car battery","weight":"8.1"},{"id":"103","name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":"0.8"},{"id":"104","name":"hammer","description":"12oz carpenter's hammer","weight":"0.75"},{"id":"105","name":"hammer","description":"14oz carpenter's hammer","weight":"0.875"},{"id":"106","name":"hammer","description":null,"weight":"1.0"},{"id":"107","name":"rocks","description":"box of assorted rocks","weight":"5.3"},{"id":"108","name":"jacket","description":"water resistent black wind breaker","weight":"0.1"},{"id":"109","name":"spare tire","description":"24 inch spare tire","weight":"22.2"}],"database":"inventory","es":1589373515000,"id":3,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373515477,"type":"INSERT"}
-{"data":[{"id":"106","name":"hammer","description":"18oz carpenter hammer","weight":"1.0"}],"database":"inventory","es":1589373546000,"id":4,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"description":null}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373546301,"type":"UPDATE"}
-{"data":[{"id":"107","name":"rocks","description":"box of assorted rocks","weight":"5.1"}],"database":"inventory","es":1589373549000,"id":5,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"weight":"5.3"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373549489,"type":"UPDATE"}
-{"data":[{"id":"110","name":"jacket","description":"water resistent white wind breaker","weight":"0.2"}],"database":"inventory","es":1589373552000,"id":6,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373552882,"type":"INSERT"}
-{"data":[{"id":"111","name":"scooter","description":"Big 2-wheel scooter ","weight":"5.18"}],"database":"inventory","es":1589373555000,"id":7,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373555457,"type":"INSERT"}
-{"data":[{"id":"110","name":"jacket","description":"new water resistent white wind breaker","weight":"0.5"}],"database":"inventory","es":1589373558000,"id":8,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"description":"water resistent white wind breaker","weight":"0.2"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373558230,"type":"UPDATE"}
-{"data":[{"id":"111","name":"scooter","description":"Big 2-wheel scooter ","weight":"5.17"}],"database":"inventory","es":1589373560000,"id":9,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"weight":"5.18"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373560798,"type":"UPDATE"}
-{"data":[{"id":"111","name":"scooter","description":"Big 2-wheel scooter ","weight":"5.17"}],"database":"inventory","es":1589373563000,"id":10,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373563798,"type":"DELETE"}
-{"data":[{"id":"101","name":"scooter","description":"Small 2-wheel scooter","weight":"5.17"},{"id":"102","name":"car battery","description":"12V car battery","weight":"5.17"}],"database":"inventory","es":1589373753000,"id":11,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"weight":"3.14"},{"weight":"8.1"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373753939,"type":"UPDATE"}
-{"data":null,"database":"inventory","es":1589373566000,"id":13,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE `xj_`.`user02` (`uid` int(0) NOT NULL,`uname` varchar(255) NULL, PRIMARY KEY (`uid`))","sqlType":null,"table":"user02","ts":1589373566000,"type":"CREATE"}
-{"data":[{"id":"102","name":"car battery","description":"12V car battery","weight":"5.17"},{"id":"103","name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":"0.8"}],"database":"inventory","es":1589374013000,"id":12,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589374013680,"type":"DELETE"}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/complex-schema.json b/flink-formats-kafka/flink-json-debezium/src/test/resources/complex-schema.json
deleted file mode 100644
index 8666428..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/complex-schema.json
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-{
-  "$schema": "http://json-schema.org/draft-06/schema#",
-  "description": "A representation of a person, company, organization, or place",
-  "type": "object",
-  "required": [
-    "familyName",
-    "givenName"
-  ],
-  "properties": {
-    "fn": {
-      "description": "Formatted Name",
-      "type": "string"
-    },
-    "familyName": {
-      "type": "string"
-    },
-    "additionalName": {
-      "type": "boolean"
-    },
-    "tuples": {
-      "type": "array",
-      "items": [
-        {
-          "type": "number"
-        },
-        {
-          "type": "string"
-        },
-        {
-          "type": "string",
-          "enum": [
-            "Street",
-            "Avenue",
-            "Boulevard"
-          ]
-        },
-        {
-          "type": "string",
-          "enum": [
-            "NW",
-            "NE",
-            "SW",
-            "SE"
-          ]
-        }
-      ],
-      "additionalItems": false
-    },
-    "honorificPrefix": {
-      "type": "array",
-      "items": {
-        "type": "string"
-      }
-    },
-    "url": {
-      "type": "string",
-      "format": "uri"
-    },
-    "email": {
-      "type": "object",
-      "properties": {
-        "type": {
-          "type": "string"
-        },
-        "value": {
-          "type": "string",
-          "format": "email"
-        }
-      }
-    },
-    tel: {
-      "type": "object",
-      "properties": {
-        "type": {
-          "type": "integer"
-        },
-        "value": {
-          "type": "string",
-          "format": "phone"
-        }
-      }
-    },
-    "sound": {
-      "type": "null"
-    },
-    "org": {
-      "type": "object",
-      "properties": {
-        "organizationUnit": {
-          "type": "object",
-          "properties": {}
-        }
-      }
-    }
-  }
-}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-data-schema-exclude.txt b/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-data-schema-exclude.txt
deleted file mode 100644
index 3763369..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-data-schema-exclude.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}
-{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
-{"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.800000011920929},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
-{"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
-{"before":null,"after":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
-{"before":null,"after":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
-{"before":null,"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
-{"before":null,"after":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.10000000149011612},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
-{"before":null,"after":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
-{"before":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"after":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589361987000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":362,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589361987936,"transaction":null}
-{"before":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.099999904632568},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362099000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362099505,"transaction":null}
-{"before":null,"after":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362210000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1068,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362210230,"transaction":null}
-{"before":null,"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362243000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1394,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362243428,"transaction":null}
-{"before":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"after":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362293000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1707,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362293539,"transaction":null}
-{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362330000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2090,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362330904,"transaction":null}
-{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"after":null,"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362344000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2443,"row":0,"thread":2,"query":null},"op":"d","ts_ms":1589362344455,"transaction":null}
\ No newline at end of file
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-data-schema-include.txt b/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-data-schema-include.txt
deleted file mode 100644
index b3e0f7d..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-data-schema-include.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.800000011920929},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.10000000149011612},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0},"after":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1.0},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589361987000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":362,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589361987936,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.099999904632568},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362099000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362099505,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362210000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1068,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362210230,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362243000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1394,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362243428,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"after":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362293000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1707,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362293539,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362330000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2090,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362330904,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"after":null,"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362344000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2443,"row":0,"thread":2,"query":null},"op":"d","ts_ms":1589362344455,"transaction":null}}
\ No newline at end of file
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-postgres-data-replica-identity.txt b/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-postgres-data-replica-identity.txt
deleted file mode 100644
index d4fcb88..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-postgres-data-replica-identity.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099434,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099434,"transaction":null}
-{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099435,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099435,"transaction":null}
-{"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099435,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099435,"transaction":null}
-{"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099436,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099436,"transaction":null}
-{"before":null,"after":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099436,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099436,"transaction":null}
-{"before":null,"after":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099436,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099436,"transaction":null}
-{"before":null,"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099437,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099437,"transaction":null}
-{"before":null,"after":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.1},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099437,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099437,"transaction":null}
-{"before":null,"after":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.2},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099437,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099438,"transaction":null}
-{"before":null,"after":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1.0},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010889629,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":602,"lsn":34131104,"xmin":null},"op":"u","ts_ms":1596010890411,"transaction":null}
-{"before":null,"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.1},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010930407,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":603,"lsn":34132200,"xmin":null},"op":"u","ts_ms":1596010930623,"transaction":null}
-{"before":null,"after":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010946488,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":604,"lsn":34132560,"xmin":null},"op":"c","ts_ms":1596010946870,"transaction":null}
-{"before":null,"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010976756,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":605,"lsn":34133072,"xmin":null},"op":"c","ts_ms":1596010976880,"transaction":null}
-{"before":null,"after":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010982228,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":606,"lsn":34133344,"xmin":null},"op":"u","ts_ms":1596010982481,"transaction":null}
-{"before":null,"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010985627,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":607,"lsn":34133584,"xmin":null},"op":"u","ts_ms":1596010986047,"transaction":null}
-{"before":null,"after":null,"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010988168,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":608,"lsn":34133800,"xmin":null},"op":"d","ts_ms":1596010988596,"transaction":null}
\ No newline at end of file
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-postgres-data-schema-exclude.txt b/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-postgres-data-schema-exclude.txt
deleted file mode 100644
index 993f5f4..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-postgres-data-schema-exclude.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099434,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099434,"transaction":null}
-{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099435,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099435,"transaction":null}
-{"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099435,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099435,"transaction":null}
-{"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099436,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099436,"transaction":null}
-{"before":null,"after":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099436,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099436,"transaction":null}
-{"before":null,"after":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099436,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099436,"transaction":null}
-{"before":null,"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099437,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099437,"transaction":null}
-{"before":null,"after":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.1},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099437,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099437,"transaction":null}
-{"before":null,"after":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.2},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099437,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099438,"transaction":null}
-{"before":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0},"after":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1.0},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010889629,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":602,"lsn":34131104,"xmin":null},"op":"u","ts_ms":1596010890411,"transaction":null}
-{"before":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3},"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.1},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010930407,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":603,"lsn":34132200,"xmin":null},"op":"u","ts_ms":1596010930623,"transaction":null}
-{"before":null,"after":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010946488,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":604,"lsn":34132560,"xmin":null},"op":"c","ts_ms":1596010946870,"transaction":null}
-{"before":null,"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010976756,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":605,"lsn":34133072,"xmin":null},"op":"c","ts_ms":1596010976880,"transaction":null}
-{"before":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2},"after":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010982228,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":606,"lsn":34133344,"xmin":null},"op":"u","ts_ms":1596010982481,"transaction":null}
-{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18},"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010985627,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":607,"lsn":34133584,"xmin":null},"op":"u","ts_ms":1596010986047,"transaction":null}
-{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17},"after":null,"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010988168,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":608,"lsn":34133800,"xmin":null},"op":"d","ts_ms":1596010988596,"transaction":null}
\ No newline at end of file
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-postgres-data-schema-include.txt b/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-postgres-data-schema-include.txt
deleted file mode 100644
index 8301935..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-postgres-data-schema-include.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099434,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099434,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099435,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099435,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099435,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099435,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099436,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099436,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099436,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099436,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099436,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099436,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099437,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099437,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.1},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099437,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099437,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.2},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099437,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099438,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0},"after":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1.0},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010889629,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":602,"lsn":34131104,"xmin":null},"op":"u","ts_ms":1596010890411,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3},"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.1},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010930407,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":603,"lsn":34132200,"xmin":null},"op":"u","ts_ms":1596010930623,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010946488,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":604,"lsn":34132560,"xmin":null},"op":"c","ts_ms":1596010946870,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010976756,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":605,"lsn":34133072,"xmin":null},"op":"c","ts_ms":1596010976880,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2},"after":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010982228,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":606,"lsn":34133344,"xmin":null},"op":"u","ts_ms":1596010982481,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18},"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010985627,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":607,"lsn":34133584,"xmin":null},"op":"u","ts_ms":1596010986047,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17},"after":null,"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010988168,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":608,"lsn":34133800,"xmin":null},"op":"d","ts_ms":1596010988596,"transaction":null}}
\ No newline at end of file
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/maxwell-data.txt b/flink-formats-kafka/flink-json-debezium/src/test/resources/maxwell-data.txt
deleted file mode 100644
index 2d33ff7..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/maxwell-data.txt
+++ /dev/null
@@ -1,20 +0,0 @@
-{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":1,"data":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":4,"data":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":5,"data":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":6,"data":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":7,"data":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.1},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"commit":true,"data":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.2},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"update","ts":1596684893,"xid":7152,"commit":true,"data":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1.0},"old":{"description":"16oz carpenter's hammer"},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"update","ts":1596684897,"xid":7169,"commit":true,"data":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.1},"old":{"weight":5.3},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"insert","ts":1596684900,"xid":7186,"commit":true,"data":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"insert","ts":1596684904,"xid":7201,"commit":true,"data":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"update","ts":1596684906,"xid":7216,"commit":true,"data":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"old":{"description":"water resistent white wind breaker","weight":0.2},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"update","ts":1596684912,"xid":7235,"commit":true,"data":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17},"old":{"weight":5.18},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"delete","ts":1596684914,"xid":7250,"commit":true,"data":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"update","ts":1596684928,"xid":7291,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":5.17},"old":{"weight":3.14},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"update","ts":1596684928,"xid":7291,"commit":true,"data":{"id":102,"name":"car battery","description":"12V car battery","weight":5.17},"old":{"weight":8.1},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"delete","ts":1596684938,"xid":7322,"xoffset":0,"data":{"id":102,"name":"car battery","description":"12V car battery","weight":5.17},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"delete","ts":1596684938,"xid":7322,"commit":true,"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8},"primary_key_columns": ["id"]}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/ogg-data.txt b/flink-formats-kafka/flink-json-debezium/src/test/resources/ogg-data.txt
deleted file mode 100644
index d295925..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/ogg-data.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000143","primary_keys":["id"],"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"op_type":"I", "current_ts":"2020-05-13T13:39:35.766000", "op_ts":"2020-05-13 15:40:06.000000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000144","primary_keys":["id"],"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000145","primary_keys":["id"],"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.800000011920929},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000146","primary_keys":["id"],"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000147","primary_keys":["id"],"after":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000148","primary_keys":["id"],"after":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000149","primary_keys":["id"],"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000150","primary_keys":["id"],"after":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.10000000149011612},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000151","primary_keys":["id"],"after":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000152","primary_keys":["id"],"before":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"after":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1},"op_type":"U","op_ts":"2020-05-13 17:26:27.936000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000153","primary_keys":["id"],"before":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.099999904632568},"op_type":"U","op_ts":"2020-05-13 17:28:19.505000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000154","primary_keys":["id"],"after":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","op_ts":1589362210000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1068,"row":0,"thread":2,"query":null},"op_type":"I","op_ts":"2020-05-13 17:30:10.230000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000155","primary_keys":["id"],"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"op_type":"I","op_ts":"2020-05-13 17:30:43.428000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000156","primary_keys":["id"],"before":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"after":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"op_type":"U","op_ts":"2020-05-13 17:32:20.327000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000157","primary_keys":["id"],"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"op_type":"U","op_ts":"2020-05-13 17:32:10.904000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000158","primary_keys":["id"],"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"after":null,"op_type":"D","op_ts":"2020-05-13 17:32:24.455000"}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/reference-schema.json b/flink-formats-kafka/flink-json-debezium/src/test/resources/reference-schema.json
deleted file mode 100644
index 99e0e79..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/reference-schema.json
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-{
-  "$schema": "http://json-schema.org/draft-04/schema#",
-  "definitions": {
-    "address": {
-      "type": "object",
-      "properties": {
-        "street_address": {
-          "type": "string"
-        },
-        "city": {
-          "type": "string"
-        },
-        "state": {
-          "type": "string"
-        }
-      },
-      "required": [
-        "street_address",
-        "city",
-        "state"
-      ]
-    }
-  },
-  "type": "object",
-  "properties": {
-    "billing_address": {
-      "$ref": "#/definitions/address"
-    },
-    "shipping_address": {
-      "$ref": "#/definitions/address"
-    },
-    "optional_address": {
-      "oneOf": [
-        {
-          "type": "null"
-        },
-        {
-          "$ref": "#/definitions/address"
-        }
-      ]
-    }
-  }
-}
diff --git a/flink-formats-kafka/pom.xml b/flink-formats-kafka/pom.xml
index dbe67f5..8ab527e 100644
--- a/flink-formats-kafka/pom.xml
+++ b/flink-formats-kafka/pom.xml
@@ -35,7 +35,6 @@
 
   <modules>
     <module>flink-avro-confluent-registry</module>
-    <module>flink-json-debezium</module>
     <module>flink-sql-avro-confluent-registry</module>
   </modules>