[FLINK-31747] remove debezium json for now
This closes #16.
diff --git a/flink-formats-kafka/flink-json-debezium/archunit-violations/24119078-1071-4906-b2ac-ed57c8154eaa b/flink-formats-kafka/flink-json-debezium/archunit-violations/24119078-1071-4906-b2ac-ed57c8154eaa
deleted file mode 100644
index e69de29..0000000
--- a/flink-formats-kafka/flink-json-debezium/archunit-violations/24119078-1071-4906-b2ac-ed57c8154eaa
+++ /dev/null
diff --git a/flink-formats-kafka/flink-json-debezium/archunit-violations/62c5e4e5-2b0e-41ed-a268-ee33d5edd162 b/flink-formats-kafka/flink-json-debezium/archunit-violations/62c5e4e5-2b0e-41ed-a268-ee33d5edd162
deleted file mode 100644
index e69de29..0000000
--- a/flink-formats-kafka/flink-json-debezium/archunit-violations/62c5e4e5-2b0e-41ed-a268-ee33d5edd162
+++ /dev/null
diff --git a/flink-formats-kafka/flink-json-debezium/archunit-violations/stored.rules b/flink-formats-kafka/flink-json-debezium/archunit-violations/stored.rules
deleted file mode 100644
index d43a144..0000000
--- a/flink-formats-kafka/flink-json-debezium/archunit-violations/stored.rules
+++ /dev/null
@@ -1,4 +0,0 @@
-#
-#Mon Apr 04 17:11:32 CEST 2022
-Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ ITCase=24119078-1071-4906-b2ac-ed57c8154eaa
-ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\ extension=62c5e4e5-2b0e-41ed-a268-ee33d5edd162
diff --git a/flink-formats-kafka/flink-json-debezium/pom.xml b/flink-formats-kafka/flink-json-debezium/pom.xml
deleted file mode 100644
index 147cf0e..0000000
--- a/flink-formats-kafka/flink-json-debezium/pom.xml
+++ /dev/null
@@ -1,141 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-formats-kafka</artifactId>
- <version>4.1-SNAPSHOT</version>
- </parent>
-
- <artifactId>flink-json-debezium</artifactId>
- <name>Flink : Formats : Json Debezium</name>
-
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-json</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-common</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- <optional>true</optional>
- </dependency>
-
- <!-- test dependencies -->
-
- <!-- Required for integration test -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-files</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- JSON table descriptor testing -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-common</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
-
- <!-- JSON RowData schema test dependency -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- Json filesystem format factory ITCase test dependency -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
-
- <!-- test utils dependency -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-test-utils</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- ArchUit test dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-architecture-tests-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <id>shade-flink</id>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <artifactSet>
- <includes>
- <include>org.apache.flink:flink-format-common</include>
- </includes>
- </artifactSet>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
-</project>
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java
deleted file mode 100644
index 706d01a..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.json.debezium;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.MetadataConverter;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.Projection;
-import org.apache.flink.table.connector.format.DecodingFormat;
-import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.data.GenericMapData;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.utils.DataTypeUtils;
-import org.apache.flink.types.RowKind;
-
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/** {@link DecodingFormat} for Debezium using JSON encoding. */
-public class DebeziumJsonDecodingFormat
- implements ProjectableDecodingFormat<DeserializationSchema<RowData>> {
-
- // --------------------------------------------------------------------------------------------
- // Mutable attributes
- // --------------------------------------------------------------------------------------------
-
- private List<String> metadataKeys;
-
- // --------------------------------------------------------------------------------------------
- // Debezium-specific attributes
- // --------------------------------------------------------------------------------------------
-
- private final boolean schemaInclude;
-
- private final boolean ignoreParseErrors;
-
- private final TimestampFormat timestampFormat;
-
- public DebeziumJsonDecodingFormat(
- boolean schemaInclude, boolean ignoreParseErrors, TimestampFormat timestampFormat) {
- this.schemaInclude = schemaInclude;
- this.ignoreParseErrors = ignoreParseErrors;
- this.timestampFormat = timestampFormat;
- this.metadataKeys = Collections.emptyList();
- }
-
- @Override
- public DeserializationSchema<RowData> createRuntimeDecoder(
- DynamicTableSource.Context context, DataType physicalDataType, int[][] projections) {
- physicalDataType = Projection.of(projections).project(physicalDataType);
-
- final List<ReadableMetadata> readableMetadata =
- metadataKeys.stream()
- .map(
- k ->
- Stream.of(ReadableMetadata.values())
- .filter(rm -> rm.key.equals(k))
- .findFirst()
- .orElseThrow(IllegalStateException::new))
- .collect(Collectors.toList());
-
- final List<DataTypes.Field> metadataFields =
- readableMetadata.stream()
- .map(m -> DataTypes.FIELD(m.key, m.dataType))
- .collect(Collectors.toList());
-
- final DataType producedDataType =
- DataTypeUtils.appendRowFields(physicalDataType, metadataFields);
-
- final TypeInformation<RowData> producedTypeInfo =
- context.createTypeInformation(producedDataType);
-
- return new DebeziumJsonDeserializationSchema(
- physicalDataType,
- readableMetadata,
- producedTypeInfo,
- schemaInclude,
- ignoreParseErrors,
- timestampFormat);
- }
-
- @Override
- public Map<String, DataType> listReadableMetadata() {
- final Map<String, DataType> metadataMap = new LinkedHashMap<>();
- Stream.of(ReadableMetadata.values())
- .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
- return metadataMap;
- }
-
- @Override
- public void applyReadableMetadata(List<String> metadataKeys) {
- this.metadataKeys = metadataKeys;
- }
-
- @Override
- public ChangelogMode getChangelogMode() {
- return ChangelogMode.newBuilder()
- .addContainedKind(RowKind.INSERT)
- .addContainedKind(RowKind.UPDATE_BEFORE)
- .addContainedKind(RowKind.UPDATE_AFTER)
- .addContainedKind(RowKind.DELETE)
- .build();
- }
-
- // --------------------------------------------------------------------------------------------
- // Metadata handling
- // --------------------------------------------------------------------------------------------
-
- /** List of metadata that can be read with this format. */
- enum ReadableMetadata {
- SCHEMA(
- "schema",
- DataTypes.STRING().nullable(),
- false,
- DataTypes.FIELD("schema", DataTypes.STRING()),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData row, int pos) {
- return row.getString(pos);
- }
- }),
-
- INGESTION_TIMESTAMP(
- "ingestion-timestamp",
- DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
- true,
- DataTypes.FIELD("ts_ms", DataTypes.BIGINT()),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData row, int pos) {
- if (row.isNullAt(pos)) {
- return null;
- }
- return TimestampData.fromEpochMillis(row.getLong(pos));
- }
- }),
-
- SOURCE_TIMESTAMP(
- "source.timestamp",
- DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
- true,
- DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData row, int pos) {
- final StringData timestamp =
- (StringData) readProperty(row, pos, KEY_SOURCE_TIMESTAMP);
- if (timestamp == null) {
- return null;
- }
- return TimestampData.fromEpochMillis(Long.parseLong(timestamp.toString()));
- }
- }),
-
- SOURCE_DATABASE(
- "source.database",
- DataTypes.STRING().nullable(),
- true,
- DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData row, int pos) {
- return readProperty(row, pos, KEY_SOURCE_DATABASE);
- }
- }),
-
- SOURCE_SCHEMA(
- "source.schema",
- DataTypes.STRING().nullable(),
- true,
- DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData row, int pos) {
- return readProperty(row, pos, KEY_SOURCE_SCHEMA);
- }
- }),
-
- SOURCE_TABLE(
- "source.table",
- DataTypes.STRING().nullable(),
- true,
- DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData row, int pos) {
- return readProperty(row, pos, KEY_SOURCE_TABLE);
- }
- }),
-
- SOURCE_PROPERTIES(
- "source.properties",
- // key and value of the map are nullable to make handling easier in queries
- DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable())
- .nullable(),
- true,
- DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData row, int pos) {
- return row.getMap(pos);
- }
- });
-
- final String key;
-
- final DataType dataType;
-
- final boolean isJsonPayload;
-
- final DataTypes.Field requiredJsonField;
-
- final MetadataConverter converter;
-
- ReadableMetadata(
- String key,
- DataType dataType,
- boolean isJsonPayload,
- DataTypes.Field requiredJsonField,
- MetadataConverter converter) {
- this.key = key;
- this.dataType = dataType;
- this.isJsonPayload = isJsonPayload;
- this.requiredJsonField = requiredJsonField;
- this.converter = converter;
- }
- }
-
- private static final StringData KEY_SOURCE_TIMESTAMP = StringData.fromString("ts_ms");
-
- private static final StringData KEY_SOURCE_DATABASE = StringData.fromString("db");
-
- private static final StringData KEY_SOURCE_SCHEMA = StringData.fromString("schema");
-
- private static final StringData KEY_SOURCE_TABLE = StringData.fromString("table");
-
- private static Object readProperty(GenericRowData row, int pos, StringData key) {
- final GenericMapData map = (GenericMapData) row.getMap(pos);
- if (map == null) {
- return null;
- }
- return map.get(key);
- }
-}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
deleted file mode 100644
index cba336d..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.json.debezium;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
-import org.apache.flink.formats.json.debezium.DebeziumJsonDecodingFormat.ReadableMetadata;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.DataTypeUtils;
-import org.apache.flink.types.RowKind;
-import org.apache.flink.util.Collector;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-import static java.lang.String.format;
-
-/**
- * Deserialization schema from Debezium JSON to Flink Table/SQL internal data structure {@link
- * RowData}. The deserialization schema knows Debezium's schema definition and can extract the
- * database data and convert into {@link RowData} with {@link RowKind}.
- *
- * <p>Deserializes a <code>byte[]</code> message as a JSON object and reads the specified fields.
- *
- * <p>Failures during deserialization are forwarded as wrapped IOExceptions.
- *
- * @see <a href="https://debezium.io/">Debezium</a>
- */
-@Internal
-public final class DebeziumJsonDeserializationSchema implements DeserializationSchema<RowData> {
- private static final long serialVersionUID = 1L;
-
- private static final String OP_READ = "r"; // snapshot read
- private static final String OP_CREATE = "c"; // insert
- private static final String OP_UPDATE = "u"; // update
- private static final String OP_DELETE = "d"; // delete
-
- private static final String REPLICA_IDENTITY_EXCEPTION =
- "The \"before\" field of %s message is null, "
- + "if you are using Debezium Postgres Connector, "
- + "please check the Postgres table has been set REPLICA IDENTITY to FULL level.";
-
- /** The deserializer to deserialize Debezium JSON data. */
- private final JsonRowDataDeserializationSchema jsonDeserializer;
-
- /** Flag that indicates that an additional projection is required for metadata. */
- private final boolean hasMetadata;
-
- /** Metadata to be extracted for every record. */
- private final MetadataConverter[] metadataConverters;
-
- /** {@link TypeInformation} of the produced {@link RowData} (physical + meta data). */
- private final TypeInformation<RowData> producedTypeInfo;
-
- /**
- * Flag indicating whether the Debezium JSON data contains schema part or not. When Debezium
- * Kafka Connect enables "value.converter.schemas.enable", the JSON will contain "schema"
- * information, but we just ignore "schema" and extract data from "payload".
- */
- private final boolean schemaInclude;
-
- /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
- private final boolean ignoreParseErrors;
-
- public DebeziumJsonDeserializationSchema(
- DataType physicalDataType,
- List<ReadableMetadata> requestedMetadata,
- TypeInformation<RowData> producedTypeInfo,
- boolean schemaInclude,
- boolean ignoreParseErrors,
- TimestampFormat timestampFormat) {
- final RowType jsonRowType =
- createJsonRowType(physicalDataType, requestedMetadata, schemaInclude);
- this.jsonDeserializer =
- new JsonRowDataDeserializationSchema(
- jsonRowType,
- // the result type is never used, so it's fine to pass in the produced type
- // info
- producedTypeInfo,
- false, // ignoreParseErrors already contains the functionality of
- // failOnMissingField
- ignoreParseErrors,
- timestampFormat);
- this.hasMetadata = requestedMetadata.size() > 0;
- this.metadataConverters =
- createMetadataConverters(jsonRowType, requestedMetadata, schemaInclude);
- this.producedTypeInfo = producedTypeInfo;
- this.schemaInclude = schemaInclude;
- this.ignoreParseErrors = ignoreParseErrors;
- }
-
- @Override
- public void open(InitializationContext context) throws Exception {
- jsonDeserializer.open(context);
- }
-
- @Override
- public RowData deserialize(byte[] message) {
- throw new RuntimeException(
- "Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
- }
-
- @Override
- public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
- if (message == null || message.length == 0) {
- // skip tombstone messages
- return;
- }
- try {
- GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message);
- GenericRowData payload;
- if (schemaInclude) {
- payload = (GenericRowData) row.getField(0);
- } else {
- payload = row;
- }
-
- GenericRowData before = (GenericRowData) payload.getField(0);
- GenericRowData after = (GenericRowData) payload.getField(1);
- String op = payload.getField(2).toString();
- if (OP_CREATE.equals(op) || OP_READ.equals(op)) {
- after.setRowKind(RowKind.INSERT);
- emitRow(row, after, out);
- } else if (OP_UPDATE.equals(op)) {
- if (before == null) {
- throw new IllegalStateException(
- String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE"));
- }
- before.setRowKind(RowKind.UPDATE_BEFORE);
- after.setRowKind(RowKind.UPDATE_AFTER);
- emitRow(row, before, out);
- emitRow(row, after, out);
- } else if (OP_DELETE.equals(op)) {
- if (before == null) {
- throw new IllegalStateException(
- String.format(REPLICA_IDENTITY_EXCEPTION, "DELETE"));
- }
- before.setRowKind(RowKind.DELETE);
- emitRow(row, before, out);
- } else {
- if (!ignoreParseErrors) {
- throw new IOException(
- format(
- "Unknown \"op\" value \"%s\". The Debezium JSON message is '%s'",
- op, new String(message)));
- }
- }
- } catch (Throwable t) {
- // a big try catch to protect the processing.
- if (!ignoreParseErrors) {
- throw new IOException(
- format("Corrupt Debezium JSON message '%s'.", new String(message)), t);
- }
- }
- }
-
- private void emitRow(
- GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
- // shortcut in case no output projection is required
- if (!hasMetadata) {
- out.collect(physicalRow);
- return;
- }
-
- final int physicalArity = physicalRow.getArity();
- final int metadataArity = metadataConverters.length;
-
- final GenericRowData producedRow =
- new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity);
-
- for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
- producedRow.setField(physicalPos, physicalRow.getField(physicalPos));
- }
-
- for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
- producedRow.setField(
- physicalArity + metadataPos, metadataConverters[metadataPos].convert(rootRow));
- }
-
- out.collect(producedRow);
- }
-
- @Override
- public boolean isEndOfStream(RowData nextElement) {
- return false;
- }
-
- @Override
- public TypeInformation<RowData> getProducedType() {
- return producedTypeInfo;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- DebeziumJsonDeserializationSchema that = (DebeziumJsonDeserializationSchema) o;
- return Objects.equals(jsonDeserializer, that.jsonDeserializer)
- && hasMetadata == that.hasMetadata
- && Objects.equals(producedTypeInfo, that.producedTypeInfo)
- && schemaInclude == that.schemaInclude
- && ignoreParseErrors == that.ignoreParseErrors;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(
- jsonDeserializer, hasMetadata, producedTypeInfo, schemaInclude, ignoreParseErrors);
- }
-
- // --------------------------------------------------------------------------------------------
-
- private static RowType createJsonRowType(
- DataType physicalDataType,
- List<ReadableMetadata> readableMetadata,
- boolean schemaInclude) {
- DataType payload =
- DataTypes.ROW(
- DataTypes.FIELD("before", physicalDataType),
- DataTypes.FIELD("after", physicalDataType),
- DataTypes.FIELD("op", DataTypes.STRING()));
-
- // append fields that are required for reading metadata in the payload
- final List<DataTypes.Field> payloadMetadataFields =
- readableMetadata.stream()
- .filter(m -> m.isJsonPayload)
- .map(m -> m.requiredJsonField)
- .distinct()
- .collect(Collectors.toList());
- payload = DataTypeUtils.appendRowFields(payload, payloadMetadataFields);
-
- DataType root = payload;
- if (schemaInclude) {
- // when Debezium Kafka connect enables "value.converter.schemas.enable",
- // the JSON will contain "schema" information and we need to extract data from
- // "payload".
- root = DataTypes.ROW(DataTypes.FIELD("payload", payload));
- }
-
- // append fields that are required for reading metadata in the root
- final List<DataTypes.Field> rootMetadataFields =
- readableMetadata.stream()
- .filter(m -> !m.isJsonPayload)
- .map(m -> m.requiredJsonField)
- .distinct()
- .collect(Collectors.toList());
- root = DataTypeUtils.appendRowFields(root, rootMetadataFields);
-
- return (RowType) root.getLogicalType();
- }
-
- private static MetadataConverter[] createMetadataConverters(
- RowType jsonRowType, List<ReadableMetadata> requestedMetadata, boolean schemaInclude) {
- return requestedMetadata.stream()
- .map(
- m -> {
- if (m.isJsonPayload) {
- return convertInPayload(jsonRowType, m, schemaInclude);
- } else {
- return convertInRoot(jsonRowType, m);
- }
- })
- .toArray(MetadataConverter[]::new);
- }
-
- private static MetadataConverter convertInRoot(RowType jsonRowType, ReadableMetadata metadata) {
- final int pos = findFieldPos(metadata, jsonRowType);
- return new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData root, int unused) {
- return metadata.converter.convert(root, pos);
- }
- };
- }
-
- private static MetadataConverter convertInPayload(
- RowType jsonRowType, ReadableMetadata metadata, boolean schemaInclude) {
- if (schemaInclude) {
- final int pos = findFieldPos(metadata, (RowType) jsonRowType.getChildren().get(0));
- return new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData root, int unused) {
- final GenericRowData payload = (GenericRowData) root.getField(0);
- return metadata.converter.convert(payload, pos);
- }
- };
- }
- return convertInRoot(jsonRowType, metadata);
- }
-
- private static int findFieldPos(ReadableMetadata metadata, RowType jsonRowType) {
- return jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName());
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Converter that extracts a metadata field from the row (root or payload) that comes out of the
- * JSON schema and converts it to the desired data type.
- */
- interface MetadataConverter extends Serializable {
-
- // Method for top-level access.
- default Object convert(GenericRowData row) {
- return convert(row, -1);
- }
-
- Object convert(GenericRowData row, int pos);
- }
-}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
deleted file mode 100644
index d72fcd2..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.json.debezium;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonFormatOptions;
-import org.apache.flink.formats.json.JsonFormatOptionsUtil;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.DecodingFormat;
-import org.apache.flink.table.connector.format.EncodingFormat;
-import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.DeserializationFormatFactory;
-import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.factories.SerializationFormatFactory;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.RowKind;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
-import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.IGNORE_PARSE_ERRORS;
-import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
-import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_MODE;
-import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.SCHEMA_INCLUDE;
-import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.TIMESTAMP_FORMAT;
-
-/**
- * Format factory for providing configured instances of Debezium JSON to RowData {@link
- * DeserializationSchema}.
- */
-@Internal
-public class DebeziumJsonFormatFactory
- implements DeserializationFormatFactory, SerializationFormatFactory {
-
- public static final String IDENTIFIER = "debezium-json";
-
- @Override
- public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
- DynamicTableFactory.Context context, ReadableConfig formatOptions) {
-
- FactoryUtil.validateFactoryOptions(this, formatOptions);
- validateDecodingFormatOptions(formatOptions);
-
- final boolean schemaInclude = formatOptions.get(SCHEMA_INCLUDE);
-
- final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
-
- final TimestampFormat timestampFormat =
- JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
-
- return new DebeziumJsonDecodingFormat(schemaInclude, ignoreParseErrors, timestampFormat);
- }
-
- @Override
- public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
- DynamicTableFactory.Context context, ReadableConfig formatOptions) {
-
- FactoryUtil.validateFactoryOptions(this, formatOptions);
- validateEncodingFormatOptions(formatOptions);
-
- TimestampFormat timestampFormat = JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
- JsonFormatOptions.MapNullKeyMode mapNullKeyMode =
- JsonFormatOptionsUtil.getMapNullKeyMode(formatOptions);
- String mapNullKeyLiteral = formatOptions.get(JSON_MAP_NULL_KEY_LITERAL);
-
- final boolean encodeDecimalAsPlainNumber =
- formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
-
- return new EncodingFormat<SerializationSchema<RowData>>() {
-
- @Override
- public ChangelogMode getChangelogMode() {
- return ChangelogMode.newBuilder()
- .addContainedKind(RowKind.INSERT)
- .addContainedKind(RowKind.UPDATE_BEFORE)
- .addContainedKind(RowKind.UPDATE_AFTER)
- .addContainedKind(RowKind.DELETE)
- .build();
- }
-
- @Override
- public SerializationSchema<RowData> createRuntimeEncoder(
- DynamicTableSink.Context context, DataType consumedDataType) {
- final RowType rowType = (RowType) consumedDataType.getLogicalType();
- return new DebeziumJsonSerializationSchema(
- rowType,
- timestampFormat,
- mapNullKeyMode,
- mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
- }
- };
- }
-
- @Override
- public String factoryIdentifier() {
- return IDENTIFIER;
- }
-
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- return Collections.emptySet();
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- Set<ConfigOption<?>> options = new HashSet<>();
- options.add(SCHEMA_INCLUDE);
- options.add(IGNORE_PARSE_ERRORS);
- options.add(TIMESTAMP_FORMAT);
- options.add(JSON_MAP_NULL_KEY_MODE);
- options.add(JSON_MAP_NULL_KEY_LITERAL);
- options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
- return options;
- }
-
- /** Validator for debezium decoding format. */
- private static void validateDecodingFormatOptions(ReadableConfig tableOptions) {
- JsonFormatOptionsUtil.validateDecodingFormatOptions(tableOptions);
- }
-
- /** Validator for debezium encoding format. */
- private static void validateEncodingFormatOptions(ReadableConfig tableOptions) {
- JsonFormatOptionsUtil.validateEncodingFormatOptions(tableOptions);
-
- // validator for {@link SCHEMA_INCLUDE}
- if (tableOptions.get(SCHEMA_INCLUDE)) {
- throw new ValidationException(
- String.format(
- "Debezium JSON serialization doesn't support '%s.%s' option been set to true.",
- IDENTIFIER, SCHEMA_INCLUDE.key()));
- }
- }
-}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatOptions.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatOptions.java
deleted file mode 100644
index bf338a9..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatOptions.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.json.debezium;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.formats.json.JsonFormatOptions;
-
-/** Option utils for debezium-json format. */
-@PublicEvolving
-public class DebeziumJsonFormatOptions {
-
- public static final ConfigOption<Boolean> SCHEMA_INCLUDE =
- ConfigOptions.key("schema-include")
- .booleanType()
- .defaultValue(false)
- .withDescription(
- "When setting up a Debezium Kafka Connect, users can enable "
- + "a Kafka configuration 'value.converter.schemas.enable' to include schema in the message. "
- + "This option indicates the Debezium JSON data include the schema in the message or not. "
- + "Default is false.");
-
- public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS =
- JsonFormatOptions.IGNORE_PARSE_ERRORS;
-
- public static final ConfigOption<String> TIMESTAMP_FORMAT = JsonFormatOptions.TIMESTAMP_FORMAT;
-
- public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE =
- JsonFormatOptions.MAP_NULL_KEY_MODE;
-
- public static final ConfigOption<String> JSON_MAP_NULL_KEY_LITERAL =
- JsonFormatOptions.MAP_NULL_KEY_LITERAL;
-
- private DebeziumJsonFormatOptions() {}
-}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java
deleted file mode 100644
index 0dc9a96..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.json.debezium;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonFormatOptions;
-import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-
-import java.util.Objects;
-
-import static java.lang.String.format;
-import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
-
-/**
- * Serialization schema from Flink Table/SQL internal data structure {@link RowData} to Debezium
- * JSON.
- *
- * @see <a href="https://debezium.io/">Debezium</a>
- */
-public class DebeziumJsonSerializationSchema implements SerializationSchema<RowData> {
- private static final long serialVersionUID = 1L;
-
- private static final StringData OP_INSERT = StringData.fromString("c"); // insert
- private static final StringData OP_DELETE = StringData.fromString("d"); // delete
-
- /** The serializer to serialize Debezium JSON data. * */
- private final JsonRowDataSerializationSchema jsonSerializer;
-
- private transient GenericRowData genericRowData;
-
- public DebeziumJsonSerializationSchema(
- RowType rowType,
- TimestampFormat timestampFormat,
- JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
- String mapNullKeyLiteral,
- boolean encodeDecimalAsPlainNumber) {
- jsonSerializer =
- new JsonRowDataSerializationSchema(
- createJsonRowType(fromLogicalToDataType(rowType)),
- timestampFormat,
- mapNullKeyMode,
- mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
- }
-
- @Override
- public void open(InitializationContext context) throws Exception {
- jsonSerializer.open(context);
- genericRowData = new GenericRowData(3);
- }
-
- @Override
- public byte[] serialize(RowData rowData) {
- try {
- switch (rowData.getRowKind()) {
- case INSERT:
- case UPDATE_AFTER:
- genericRowData.setField(0, null);
- genericRowData.setField(1, rowData);
- genericRowData.setField(2, OP_INSERT);
- return jsonSerializer.serialize(genericRowData);
- case UPDATE_BEFORE:
- case DELETE:
- genericRowData.setField(0, rowData);
- genericRowData.setField(1, null);
- genericRowData.setField(2, OP_DELETE);
- return jsonSerializer.serialize(genericRowData);
- default:
- throw new UnsupportedOperationException(
- format(
- "Unsupported operation '%s' for row kind.",
- rowData.getRowKind()));
- }
- } catch (Throwable t) {
- throw new RuntimeException(format("Could not serialize row '%s'.", rowData), t);
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- DebeziumJsonSerializationSchema that = (DebeziumJsonSerializationSchema) o;
- return Objects.equals(jsonSerializer, that.jsonSerializer);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(jsonSerializer);
- }
-
- private static RowType createJsonRowType(DataType databaseSchema) {
- // Debezium JSON contains some other information, e.g. "source", "ts_ms"
- // but we don't need them.
- return (RowType)
- DataTypes.ROW(
- DataTypes.FIELD("before", databaseSchema),
- DataTypes.FIELD("after", databaseSchema),
- DataTypes.FIELD("op", DataTypes.STRING()))
- .getLogicalType();
- }
-}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-formats-kafka/flink-json-debezium/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
deleted file mode 100644
index 3b83658..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
deleted file mode 100644
index ad61f21..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.architecture;
-
-import org.apache.flink.architecture.common.ImportOptions;
-
-import com.tngtech.archunit.core.importer.ImportOption;
-import com.tngtech.archunit.junit.AnalyzeClasses;
-import com.tngtech.archunit.junit.ArchTest;
-import com.tngtech.archunit.junit.ArchTests;
-
-/** Architecture tests for test code. */
-@AnalyzeClasses(
- packages = {"org.apache.flink.formats.json"},
- importOptions = {
- ImportOption.OnlyIncludeTests.class,
- ImportOptions.ExcludeScalaImportOption.class,
- ImportOptions.ExcludeShadedImportOption.class
- })
-public class TestCodeArchitectureTest {
-
- @ArchTest
- public static final ArchTests COMMON_TESTS = ArchTests.in(TestCodeArchitectureTestBase.class);
-}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFileSystemITCase.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFileSystemITCase.java
deleted file mode 100644
index 0c84351..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFileSystemITCase.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.json.debezium;
-
-import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
-import org.apache.flink.util.CollectionUtil;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import static java.lang.String.format;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test Filesystem connector with DebeziumJson. */
-class DebeziumJsonFileSystemITCase extends StreamingTestBase {
-
- private static final List<String> EXPECTED =
- Arrays.asList(
- "+I[101, SCOOTER, Small 2-wheel scooter, 3.14]",
- "+I[102, CAR BATTERY, 12V car battery, 8.1]",
- "+I[103, 12-PACK DRILL BITS, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]",
- "+I[104, HAMMER, 12oz carpenter's hammer, 0.75]",
- "+I[105, HAMMER, 14oz carpenter's hammer, 0.875]",
- "+I[106, HAMMER, 16oz carpenter's hammer, 1.0]",
- "+I[107, ROCKS, box of assorted rocks, 5.3]",
- "+I[108, JACKET, water resistent black wind breaker, 0.1]",
- "+I[109, SPARE TIRE, 24 inch spare tire, 22.2]",
- "-D[106, HAMMER, 16oz carpenter's hammer, 1.0]", // -U
- "+I[106, HAMMER, 18oz carpenter hammer, 1.0]", // +U
- "-D[107, ROCKS, box of assorted rocks, 5.3]", // -U
- "+I[107, ROCKS, box of assorted rocks, 5.1]", // +U
- "+I[110, JACKET, water resistent white wind breaker, 0.2]",
- "+I[111, SCOOTER, Big 2-wheel scooter , 5.18]",
- "-D[110, JACKET, water resistent white wind breaker, 0.2]", // -U
- "+I[110, JACKET, new water resistent white wind breaker, 0.5]", // +U
- "-D[111, SCOOTER, Big 2-wheel scooter , 5.18]", // -U
- "+I[111, SCOOTER, Big 2-wheel scooter , 5.17]", // +U
- "-D[111, SCOOTER, Big 2-wheel scooter , 5.17]");
-
- private File source;
- private File sink;
-
- private void prepareTables(boolean isPartition, Path tempSourceDir, Path tempSinkDir)
- throws IOException {
- byte[] bytes = readBytes("debezium-data-schema-exclude.txt");
- source = tempSourceDir.toFile();
- File file;
- if (isPartition) {
- File partition = new File(source, "p=1");
- partition.mkdirs();
- file = new File(partition, "my_file");
- } else {
- file = new File(source, "my_file");
- }
- file.createNewFile();
- Files.write(file.toPath(), bytes);
-
- sink = tempSinkDir.toFile();
-
- env().setParallelism(1);
- }
-
- private void createTable(boolean isSink, String path, boolean isPartition) {
- tEnv().executeSql(
- format("create table %s (", isSink ? "sink" : "source")
- + "id int, name string,"
- + (isSink ? "upper_name string," : "")
- + " description string, weight float"
- + (isPartition ? ", p int) partitioned by (p) " : ")")
- + " with ("
- + "'connector'='filesystem',"
- + "'format'='debezium-json',"
- + format("'path'='%s'", path)
- + ")");
- }
-
- @Test
- void testNonPartition(@TempDir Path tempSourceDir, @TempDir Path tempSinkDir) throws Exception {
- prepareTables(false, tempSourceDir, tempSinkDir);
- createTable(false, source.toURI().toString(), false);
- createTable(true, sink.toURI().toString(), false);
-
- tEnv().executeSql(
- "insert into sink select id,name,UPPER(name),description,weight from source")
- .await();
- CloseableIterator<Row> iter =
- tEnv().executeSql("select id,upper_name,description,weight from sink").collect();
-
- List<String> results =
- CollectionUtil.iteratorToList(iter).stream()
- .map(Row::toString)
- .collect(Collectors.toList());
- iter.close();
-
- assertThat(results).isEqualTo(EXPECTED);
- }
-
- @Test
- void testPartition(@TempDir Path tempSourceDir, @TempDir Path tempSinkDir) throws Exception {
- prepareTables(true, tempSourceDir, tempSinkDir);
- createTable(false, source.toURI().toString(), true);
- createTable(true, sink.toURI().toString(), true);
-
- tEnv().executeSql(
- "insert into sink select id,name,UPPER(name),description,weight,p from source")
- .await();
- CloseableIterator<Row> iter =
- tEnv().executeSql("select id,upper_name,description,weight,p from sink").collect();
- List<Row> list = CollectionUtil.iteratorToList(iter);
- iter.close();
-
- List<String> results =
- list.stream()
- .map(row -> Row.project(row, new int[] {0, 1, 2, 3}))
- .map(Row::toString)
- .collect(Collectors.toList());
-
- assertThat(results).isEqualTo(EXPECTED);
-
- // check partition value
- for (Row row : list) {
- assertThat(row.getField(4)).isEqualTo(1);
- }
- }
-
- private static byte[] readBytes(String resource) throws IOException {
- final URL url = DebeziumJsonSerDeSchemaTest.class.getClassLoader().getResource(resource);
- assert url != null;
- Path path = new File(url.getFile()).toPath();
- return Files.readAllBytes(path);
- }
-}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
deleted file mode 100644
index d000877..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.json.debezium;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonFormatOptions;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.TestDynamicTableFactory;
-import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
-import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.types.logical.RowType;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.Consumer;
-
-import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
-import static org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_DATA_TYPE;
-import static org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_TYPE;
-import static org.apache.flink.table.factories.utils.FactoryMocks.SCHEMA;
-import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
-import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** Tests for {@link DebeziumJsonFormatFactory}. */
-class DebeziumJsonFormatFactoryTest {
-
- @Test
- void testSeDeSchema() {
- final DebeziumJsonDeserializationSchema expectedDeser =
- new DebeziumJsonDeserializationSchema(
- PHYSICAL_DATA_TYPE,
- Collections.emptyList(),
- InternalTypeInfo.of(PHYSICAL_TYPE),
- false,
- true,
- TimestampFormat.ISO_8601);
-
- final Map<String, String> options = getAllOptions();
-
- final DynamicTableSource actualSource = createTableSource(SCHEMA, options);
- assert actualSource instanceof TestDynamicTableFactory.DynamicTableSourceMock;
- TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
- (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
-
- DeserializationSchema<RowData> actualDeser =
- scanSourceMock.valueFormat.createRuntimeDecoder(
- ScanRuntimeProviderContext.INSTANCE, PHYSICAL_DATA_TYPE);
-
- assertThat(actualDeser).isEqualTo(expectedDeser);
-
- final DebeziumJsonSerializationSchema expectedSer =
- new DebeziumJsonSerializationSchema(
- (RowType) PHYSICAL_DATA_TYPE.getLogicalType(),
- TimestampFormat.ISO_8601,
- JsonFormatOptions.MapNullKeyMode.LITERAL,
- "null",
- true);
-
- final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
- assert actualSink instanceof TestDynamicTableFactory.DynamicTableSinkMock;
- TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
- (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
-
- SerializationSchema<RowData> actualSer =
- sinkMock.valueFormat.createRuntimeEncoder(
- new SinkRuntimeProviderContext(false), PHYSICAL_DATA_TYPE);
-
- assertThat(actualSer).isEqualTo(expectedSer);
- }
-
- @Test
- void testInvalidIgnoreParseError() {
- final Map<String, String> options =
- getModifiedOptions(opts -> opts.put("debezium-json.ignore-parse-errors", "abc"));
-
- assertThatThrownBy(() -> createTableSource(SCHEMA, options))
- .satisfies(
- anyCauseMatches(
- IllegalArgumentException.class,
- "Unrecognized option for boolean: abc. "
- + "Expected either true or false(case insensitive)"));
- }
-
- @Test
- void testSchemaIncludeOption() {
- Map<String, String> options = getAllOptions();
- options.put("debezium-json.schema-include", "true");
-
- final DebeziumJsonDeserializationSchema expectedDeser =
- new DebeziumJsonDeserializationSchema(
- PHYSICAL_DATA_TYPE,
- Collections.emptyList(),
- InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()),
- true,
- true,
- TimestampFormat.ISO_8601);
- final DynamicTableSource actualSource = createTableSource(SCHEMA, options);
- TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
- (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
- DeserializationSchema<RowData> actualDeser =
- scanSourceMock.valueFormat.createRuntimeDecoder(
- ScanRuntimeProviderContext.INSTANCE, PHYSICAL_DATA_TYPE);
- assertThat(actualDeser).isEqualTo(expectedDeser);
-
- assertThatThrownBy(
- () -> {
- final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
- TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
- (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
- sinkMock.valueFormat.createRuntimeEncoder(
- new SinkRuntimeProviderContext(false), PHYSICAL_DATA_TYPE);
- })
- .satisfies(
- anyCauseMatches(
- RuntimeException.class,
- "Debezium JSON serialization doesn't support "
- + "'debezium-json.schema-include' option been set to true."));
- }
-
- @Test
- void testInvalidOptionForTimestampFormat() {
- final Map<String, String> tableOptions =
- getModifiedOptions(
- opts -> opts.put("debezium-json.timestamp-format.standard", "test"));
-
- assertThatThrownBy(() -> createTableSource(SCHEMA, tableOptions))
- .isInstanceOf(ValidationException.class)
- .satisfies(
- anyCauseMatches(
- ValidationException.class,
- "Unsupported value 'test' for timestamp-format.standard. "
- + "Supported values are [SQL, ISO-8601]."));
- }
-
- @Test
- void testInvalidOptionForMapNullKeyMode() {
- final Map<String, String> tableOptions =
- getModifiedOptions(opts -> opts.put("debezium-json.map-null-key.mode", "invalid"));
-
- assertThatThrownBy(() -> createTableSink(SCHEMA, tableOptions))
- .isInstanceOf(ValidationException.class)
- .satisfies(
- anyCauseMatches(
- ValidationException.class,
- "Unsupported value 'invalid' for option map-null-key.mode. "
- + "Supported values are [LITERAL, FAIL, DROP]."));
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- /**
- * Returns the full options modified by the given consumer {@code optionModifier}.
- *
- * @param optionModifier Consumer to modify the options
- */
- private Map<String, String> getModifiedOptions(Consumer<Map<String, String>> optionModifier) {
- Map<String, String> options = getAllOptions();
- optionModifier.accept(options);
- return options;
- }
-
- private Map<String, String> getAllOptions() {
- final Map<String, String> options = new HashMap<>();
- options.put("connector", TestDynamicTableFactory.IDENTIFIER);
- options.put("target", "MyTarget");
- options.put("buffer-size", "1000");
-
- options.put("format", "debezium-json");
- options.put("debezium-json.ignore-parse-errors", "true");
- options.put("debezium-json.timestamp-format.standard", "ISO-8601");
- options.put("debezium-json.map-null-key.mode", "LITERAL");
- options.put("debezium-json.map-null-key.literal", "null");
- options.put("debezium-json.encode.decimal-as-plain-number", "true");
- return options;
- }
-}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
deleted file mode 100644
index 3b9151f..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.json.debezium;
-
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonFormatOptions;
-import org.apache.flink.formats.json.debezium.DebeziumJsonDecodingFormat.ReadableMetadata;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.DataTypeUtils;
-import org.apache.flink.util.Collector;
-
-import org.junit.jupiter.api.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.connector.testutils.formats.SchemaTestUtils.open;
-import static org.apache.flink.table.api.DataTypes.FIELD;
-import static org.apache.flink.table.api.DataTypes.FLOAT;
-import static org.apache.flink.table.api.DataTypes.INT;
-import static org.apache.flink.table.api.DataTypes.ROW;
-import static org.apache.flink.table.api.DataTypes.STRING;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/**
- * Tests for {@link DebeziumJsonSerializationSchema} and {@link DebeziumJsonDeserializationSchema}.
- */
-class DebeziumJsonSerDeSchemaTest {
-
- private static final DataType PHYSICAL_DATA_TYPE =
- ROW(
- FIELD("id", INT().notNull()),
- FIELD("name", STRING()),
- FIELD("description", STRING()),
- FIELD("weight", FLOAT()));
-
- @Test
- void testSerializationAndSchemaIncludeDeserialization() throws Exception {
- testSerializationDeserialization("debezium-data-schema-include.txt", true);
- }
-
- @Test
- void testSerializationAndSchemaExcludeDeserialization() throws Exception {
- testSerializationDeserialization("debezium-data-schema-exclude.txt", false);
- }
-
- @Test
- void testSerializationAndPostgresSchemaIncludeDeserialization() throws Exception {
- testSerializationDeserialization("debezium-postgres-data-schema-include.txt", true);
- }
-
- @Test
- void testSerializationAndPostgresSchemaExcludeDeserialization() throws Exception {
- testSerializationDeserialization("debezium-postgres-data-schema-exclude.txt", false);
- }
-
- @Test
- void testPostgresDefaultReplicaIdentify() {
- assertThatThrownBy(
- () ->
- testSerializationDeserialization(
- "debezium-postgres-data-replica-identity.txt", false))
- .as(
- "The \"before\" field of UPDATE message is null, if you are using Debezium Postgres Connector, "
- + "please check the Postgres table has been set REPLICA IDENTITY to FULL level.")
- .isInstanceOf(Exception.class);
- }
-
- @Test
- void testTombstoneMessages() throws Exception {
- DebeziumJsonDeserializationSchema deserializationSchema =
- new DebeziumJsonDeserializationSchema(
- PHYSICAL_DATA_TYPE,
- Collections.emptyList(),
- InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()),
- false,
- false,
- TimestampFormat.ISO_8601);
- SimpleCollector collector = new SimpleCollector();
- deserializationSchema.deserialize(null, collector);
- deserializationSchema.deserialize(new byte[] {}, collector);
- assertThat(collector.list).isEmpty();
- }
-
- @Test
- void testDeserializationWithMetadata() throws Exception {
- testDeserializationWithMetadata(
- "debezium-data-schema-include.txt",
- true,
- row -> {
- assertThat(row.getInt(0)).isEqualTo(101);
- assertThat(row.getString(1).toString()).isEqualTo("scooter");
- assertThat(row.getString(2).toString()).isEqualTo("Small 2-wheel scooter");
- assertThat(row.getFloat(3)).isEqualTo(3.14f);
- assertThat(row.getString(4).toString())
- .startsWith(
- "{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},");
- assertThat(row.getTimestamp(5, 3).getMillisecond()).isEqualTo(1589355606100L);
- assertThat(row.getTimestamp(6, 3).getMillisecond()).isEqualTo(0L);
- assertThat(row.getString(7).toString()).isEqualTo("inventory");
- assertThat(row.isNullAt(8)).isEqualTo(true);
- assertThat(row.getString(9).toString()).isEqualTo("products");
- assertThat(row.getMap(10).size()).isEqualTo(14);
- });
-
- testDeserializationWithMetadata(
- "debezium-data-schema-exclude.txt",
- false,
- row -> {
- assertThat(row.getInt(0)).isEqualTo(101);
- assertThat(row.getString(1).toString()).isEqualTo("scooter");
- assertThat(row.getString(2).toString()).isEqualTo("Small 2-wheel scooter");
- assertThat(row.getFloat(3)).isEqualTo(3.14f);
- assertThat(row.isNullAt(4)).isEqualTo(true);
- assertThat(row.getTimestamp(5, 3).getMillisecond()).isEqualTo(1589355606100L);
- assertThat(row.getTimestamp(6, 3).getMillisecond()).isEqualTo(0L);
- assertThat(row.getString(7).toString()).isEqualTo("inventory");
- assertThat(row.isNullAt(8)).isEqualTo(true);
- assertThat(row.getString(9).toString()).isEqualTo("products");
- assertThat(row.getMap(10).size()).isEqualTo(14);
- });
-
- testDeserializationWithMetadata(
- "debezium-postgres-data-schema-exclude.txt",
- false,
- row -> {
- assertThat(row.getInt(0)).isEqualTo(101);
- assertThat(row.getString(1).toString()).isEqualTo("scooter");
- assertThat(row.getString(2).toString()).isEqualTo("Small 2-wheel scooter");
- assertThat(row.getFloat(3)).isEqualTo(3.14f);
- assertThat(row.isNullAt(4)).isEqualTo(true);
- assertThat(row.getTimestamp(5, 3).getMillisecond()).isEqualTo(1596001099434L);
- assertThat(row.getTimestamp(6, 3).getMillisecond()).isEqualTo(1596001099434L);
- assertThat(row.getString(7).toString()).isEqualTo("postgres");
- assertThat(row.getString(8).toString()).isEqualTo("inventory");
- assertThat(row.getString(9).toString()).isEqualTo("products");
- assertThat(row.getMap(10).size()).isEqualTo(11);
- });
- }
-
- private void testSerializationDeserialization(String resourceFile, boolean schemaInclude)
- throws Exception {
- List<String> lines = readLines(resourceFile);
- DebeziumJsonDeserializationSchema deserializationSchema =
- new DebeziumJsonDeserializationSchema(
- PHYSICAL_DATA_TYPE,
- Collections.emptyList(),
- InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()),
- schemaInclude,
- false,
- TimestampFormat.ISO_8601);
- open(deserializationSchema);
-
- SimpleCollector collector = new SimpleCollector();
- for (String line : lines) {
- deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8), collector);
- }
-
- // Debezium captures change data (`debezium-data-schema-include.txt`) on the `product`
- // table:
- //
- // CREATE TABLE product (
- // id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
- // name VARCHAR(255),
- // description VARCHAR(512),
- // weight FLOAT
- // );
- // ALTER TABLE product AUTO_INCREMENT = 101;
- //
- // INSERT INTO product
- // VALUES (default,"scooter","Small 2-wheel scooter",3.14),
- // (default,"car battery","12V car battery",8.1),
- // (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40
- // to #3",0.8),
- // (default,"hammer","12oz carpenter's hammer",0.75),
- // (default,"hammer","14oz carpenter's hammer",0.875),
- // (default,"hammer","16oz carpenter's hammer",1.0),
- // (default,"rocks","box of assorted rocks",5.3),
- // (default,"jacket","water resistent black wind breaker",0.1),
- // (default,"spare tire","24 inch spare tire",22.2);
- // UPDATE product SET description='18oz carpenter hammer' WHERE id=106;
- // UPDATE product SET weight='5.1' WHERE id=107;
- // INSERT INTO product VALUES (default,"jacket","water resistent white wind breaker",0.2);
- // INSERT INTO product VALUES (default,"scooter","Big 2-wheel scooter ",5.18);
- // UPDATE product SET description='new water resistent white wind breaker', weight='0.5'
- // WHERE id=110;
- // UPDATE product SET weight='5.17' WHERE id=111;
- // DELETE FROM product WHERE id=111;
- List<String> expected =
- Arrays.asList(
- "+I(101,scooter,Small 2-wheel scooter,3.14)",
- "+I(102,car battery,12V car battery,8.1)",
- "+I(103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8)",
- "+I(104,hammer,12oz carpenter's hammer,0.75)",
- "+I(105,hammer,14oz carpenter's hammer,0.875)",
- "+I(106,hammer,16oz carpenter's hammer,1.0)",
- "+I(107,rocks,box of assorted rocks,5.3)",
- "+I(108,jacket,water resistent black wind breaker,0.1)",
- "+I(109,spare tire,24 inch spare tire,22.2)",
- "-U(106,hammer,16oz carpenter's hammer,1.0)",
- "+U(106,hammer,18oz carpenter hammer,1.0)",
- "-U(107,rocks,box of assorted rocks,5.3)",
- "+U(107,rocks,box of assorted rocks,5.1)",
- "+I(110,jacket,water resistent white wind breaker,0.2)",
- "+I(111,scooter,Big 2-wheel scooter ,5.18)",
- "-U(110,jacket,water resistent white wind breaker,0.2)",
- "+U(110,jacket,new water resistent white wind breaker,0.5)",
- "-U(111,scooter,Big 2-wheel scooter ,5.18)",
- "+U(111,scooter,Big 2-wheel scooter ,5.17)",
- "-D(111,scooter,Big 2-wheel scooter ,5.17)");
- List<String> actual =
- collector.list.stream().map(Object::toString).collect(Collectors.toList());
- assertThat(actual).isEqualTo(expected);
-
- DebeziumJsonSerializationSchema serializationSchema =
- new DebeziumJsonSerializationSchema(
- (RowType) PHYSICAL_DATA_TYPE.getLogicalType(),
- TimestampFormat.SQL,
- JsonFormatOptions.MapNullKeyMode.LITERAL,
- "null",
- true);
-
- open(serializationSchema);
- actual = new ArrayList<>();
- for (RowData rowData : collector.list) {
- actual.add(new String(serializationSchema.serialize(rowData), StandardCharsets.UTF_8));
- }
-
- expected =
- Arrays.asList(
- "{\"before\":null,\"after\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":3.14},\"op\":\"c\"}",
- "{\"before\":null,\"after\":{\"id\":102,\"name\":\"car battery\",\"description\":\"12V car battery\",\"weight\":8.1},\"op\":\"c\"}",
- "{\"before\":null,\"after\":{\"id\":103,\"name\":\"12-pack drill bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\"weight\":0.8},\"op\":\"c\"}",
- "{\"before\":null,\"after\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's hammer\",\"weight\":0.75},\"op\":\"c\"}",
- "{\"before\":null,\"after\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's hammer\",\"weight\":0.875},\"op\":\"c\"}",
- "{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0},\"op\":\"c\"}",
- "{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3},\"op\":\"c\"}",
- "{\"before\":null,\"after\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent black wind breaker\",\"weight\":0.1},\"op\":\"c\"}",
- "{\"before\":null,\"after\":{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":22.2},\"op\":\"c\"}",
- "{\"before\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0},\"after\":null,\"op\":\"d\"}",
- "{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter hammer\",\"weight\":1.0},\"op\":\"c\"}",
- "{\"before\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3},\"after\":null,\"op\":\"d\"}",
- "{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.1},\"op\":\"c\"}",
- "{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent white wind breaker\",\"weight\":0.2},\"op\":\"c\"}",
- "{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.18},\"op\":\"c\"}",
- "{\"before\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent white wind breaker\",\"weight\":0.2},\"after\":null,\"op\":\"d\"}",
- "{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water resistent white wind breaker\",\"weight\":0.5},\"op\":\"c\"}",
- "{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.18},\"after\":null,\"op\":\"d\"}",
- "{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"op\":\"c\"}",
- "{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"after\":null,\"op\":\"d\"}");
- assertThat(actual).isEqualTo(expected);
- }
-
- private void testDeserializationWithMetadata(
- String resourceFile, boolean schemaInclude, Consumer<RowData> testConsumer)
- throws Exception {
- // we only read the first line for keeping the test simple
- final String firstLine = readLines(resourceFile).get(0);
-
- final List<ReadableMetadata> requestedMetadata = Arrays.asList(ReadableMetadata.values());
-
- final DataType producedDataType =
- DataTypeUtils.appendRowFields(
- PHYSICAL_DATA_TYPE,
- requestedMetadata.stream()
- .map(m -> DataTypes.FIELD(m.key, m.dataType))
- .collect(Collectors.toList()));
-
- final DebeziumJsonDeserializationSchema deserializationSchema =
- new DebeziumJsonDeserializationSchema(
- PHYSICAL_DATA_TYPE,
- requestedMetadata,
- InternalTypeInfo.of(producedDataType.getLogicalType()),
- schemaInclude,
- false,
- TimestampFormat.ISO_8601);
- open(deserializationSchema);
-
- final SimpleCollector collector = new SimpleCollector();
- deserializationSchema.deserialize(firstLine.getBytes(StandardCharsets.UTF_8), collector);
-
- assertThat(collector.list).hasSize(1);
- assertThat(collector.list.get(0)).satisfies(testConsumer);
- }
-
- // --------------------------------------------------------------------------------------------
- // Utilities
- // --------------------------------------------------------------------------------------------
-
- private static List<String> readLines(String resource) throws IOException {
- final URL url = DebeziumJsonSerDeSchemaTest.class.getClassLoader().getResource(resource);
- assert url != null;
- Path path = new File(url.getFile()).toPath();
- return Files.readAllLines(path);
- }
-
- private static class SimpleCollector implements Collector<RowData> {
-
- private List<RowData> list = new ArrayList<>();
-
- @Override
- public void collect(RowData record) {
- list.add(record);
- }
-
- @Override
- public void close() {
- // do nothing
- }
- }
-}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/flink-formats-kafka/flink-json-debezium/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
deleted file mode 100644
index 2899913..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.util.TestLoggerExtension
\ No newline at end of file
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/archunit.properties b/flink-formats-kafka/flink-json-debezium/src/test/resources/archunit.properties
deleted file mode 100644
index 15be88c..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/archunit.properties
+++ /dev/null
@@ -1,31 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# By default we allow removing existing violations, but fail when new violations are added.
-freeze.store.default.allowStoreUpdate=true
-
-# Enable this if a new (frozen) rule has been added in order to create the initial store and record the existing violations.
-#freeze.store.default.allowStoreCreation=true
-
-# Enable this to add allow new violations to be recorded.
-# NOTE: Adding new violations should be avoided when possible. If the rule was correct to flag a new
-# violation, please try to avoid creating the violation. If the violation was created due to a
-# shortcoming of the rule, file a JIRA issue so the rule can be improved.
-#freeze.refreeze=true
-
-freeze.store.default.path=archunit-violations
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/canal-data-filter-table.txt b/flink-formats-kafka/flink-json-debezium/src/test/resources/canal-data-filter-table.txt
deleted file mode 100644
index 68661d7..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/canal-data-filter-table.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-{"data":[{"id":"101","name":"scooter","description":"Small 2-wheel scooter","weight":"3.14"},{"id":"102","name":"car battery","description":"12V car battery","weight":"8.1"},{"id":"103","name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":"0.8"},{"id":"104","name":"hammer","description":"12oz carpenter's hammer","weight":"0.75"},{"id":"105","name":"hammer","description":"14oz carpenter's hammer","weight":"0.875"},{"id":"106","name":"hammer","description":null,"weight":"1.0"},{"id":"107","name":"rocks","description":"box of assorted rocks","weight":"5.3"},{"id":"108","name":"jacket","description":"water resistent black wind breaker","weight":"0.1"},{"id":"109","name":"spare tire","description":"24 inch spare tire","weight":"22.2"}],"database":"mydb","es":1598944132000,"id":1,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944146308,"type":"INSERT"}
-{"data":[{"id":"106","name":"hammer","description":"18oz carpenter hammer","weight":"1.0"}],"database":"mydb","es":1598944202000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":[{"description":null}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944202218,"type":"UPDATE"}
-{"data":null,"database":"mydb","es":1598944271000,"id":3,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE orders (\n order_number INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,\n order_date DATE NOT NULL,\n purchaser INTEGER NOT NULL,\n quantity INTEGER NOT NULL,\n product_id INTEGER NOT NULL\n) AUTO_INCREMENT = 10001","sqlType":null,"table":"orders","ts":1598944271192,"type":"CREATE"}
-{"data":[{"order_number":"10001","order_date":"2016-01-16","purchaser":"1001","quantity":"1","product_id":"102"},{"order_number":"10002","order_date":"2016-01-17","purchaser":"1002","quantity":"2","product_id":"105"},{"order_number":"10003","order_date":"2016-02-19","purchaser":"1002","quantity":"2","product_id":"106"},{"order_number":"10004","order_date":"2016-02-21","purchaser":"1003","quantity":"1","product_id":"107"}],"database":"mydb","es":1598944275000,"id":4,"isDdl":false,"mysqlType":{"order_number":"INTEGER","order_date":"DATE","purchaser":"INTEGER","quantity":"INTEGER","product_id":"INTEGER"},"old":null,"pkNames":["order_number"],"sql":"","sqlType":{"order_number":4,"order_date":91,"purchaser":4,"quantity":4,"product_id":4},"table":"orders","ts":1598944275018,"type":"INSERT"}
-{"data":[{"id":"107","name":"rocks","description":"box of assorted rocks","weight":"5.1"}],"database":"mydb","es":1598944279000,"id":5,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":[{"weight":"5.3"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944279665,"type":"UPDATE"}
-{"data":[{"id":"110","name":"jacket","description":"water resistent white wind breaker","weight":"0.2"}],"database":"mydb","es":1598944288000,"id":6,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944288394,"type":"INSERT"}
-{"data":[{"id":"111","name":"scooter","description":"Big 2-wheel scooter ","weight":"5.18"}],"database":"mydb","es":1598944288000,"id":6,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944288394,"type":"INSERT"}
-{"data":[{"id":"110","name":"jacket","description":"new water resistent white wind breaker","weight":"0.5"}],"database":"mydb","es":1598944288000,"id":7,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":[{"description":"water resistent white wind breaker","weight":"0.2"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944288717,"type":"UPDATE"}
-{"data":[{"order_number":"10001","order_date":"2016-01-16","purchaser":"1001","quantity":"3","product_id":"102"}],"database":"mydb","es":1598944331000,"id":8,"isDdl":false,"mysqlType":{"order_number":"INTEGER","order_date":"DATE","purchaser":"INTEGER","quantity":"INTEGER","product_id":"INTEGER"},"old":[{"quantity":"1"}],"pkNames":["order_number"],"sql":"","sqlType":{"order_number":4,"order_date":91,"purchaser":4,"quantity":4,"product_id":4},"table":"orders","ts":1598944331870,"type":"UPDATE"}
-{"data":[{"id":"111","name":"scooter","description":"Big 2-wheel scooter ","weight":"5.17"}],"database":"mydb","es":1598944337000,"id":9,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":[{"weight":"5.18"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944337341,"type":"UPDATE"}
-{"data":[{"id":"111","name":"scooter","description":"Big 2-wheel scooter ","weight":"5.17"}],"database":"mydb","es":1598944337000,"id":9,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944337341,"type":"DELETE"}
-{"data":[{"id":"101","name":"scooter","description":"Small 2-wheel scooter","weight":"5.17"},{"id":"102","name":"car battery","description":"12V car battery","weight":"5.17"}],"database":"mydb","es":1598944337000,"id":10,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":[{"weight":"3.14"},{"weight":"8.1"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944337663,"type":"UPDATE"}
-{"data":[{"order_number":"10002","order_date":"2016-01-17","purchaser":"1002","quantity":"2","product_id":"105"}],"database":"mydb","es":1598944374000,"id":11,"isDdl":false,"mysqlType":{"order_number":"INTEGER","order_date":"DATE","purchaser":"INTEGER","quantity":"INTEGER","product_id":"INTEGER"},"old":null,"pkNames":["order_number"],"sql":"","sqlType":{"order_number":4,"order_date":91,"purchaser":4,"quantity":4,"product_id":4},"table":"orders","ts":1598944374999,"type":"DELETE"}
-{"data":[{"id":"102","name":"car battery","description":"12V car battery","weight":"5.17"},{"id":"103","name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":"0.8"}],"database":"mydb","es":1598944418000,"id":12,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944418418,"type":"DELETE"}
-{"data":null,"database":"mydb","es":1598944271000,"id":13,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE project (\n id VARCHAR(255) NOT NULL,\n name VARCHAR(255) NOT NULL,\n description VARCHAR(255) NOT NULL,\n weight FLOAT NOT NULL\n)","sqlType":null,"table":"projects","ts":1598944271192,"type":"CREATE"}
-{"data":[{"id":"A101","name":"scooter","description":"Small 2-wheel scooter","weight":"3.14"},{"id":"A102","name":"car battery","description":"12V car battery","weight":"8.1"},{"id":"A103","name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":"0.8"},{"id":"A104","name":"hammer","description":"12oz carpenter's hammer","weight":"0.75"},{"id":"A105","name":"hammer","description":"14oz carpenter's hammer","weight":"0.875"},{"id":"A106","name":"hammer","description":"16oz carpenter's hammer","weight":"1.0"},{"id":"A107","name":"rocks","description":"box of assorted rocks","weight":"5.3"},{"id":"A108","name":"jacket","description":"water resistent black wind breaker","weight":"0.1"},{"id":"A109","name":"spare tire","description":"24 inch spare tire","weight":"22.2"}],"database":"mydb","es":1598944132000,"id":14,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"project","ts":1598944146308,"type":"INSERT"}
\ No newline at end of file
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/canal-data.txt b/flink-formats-kafka/flink-json-debezium/src/test/resources/canal-data.txt
deleted file mode 100644
index a83b7da..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/canal-data.txt
+++ /dev/null
@@ -1,11 +0,0 @@
-{"data":[{"id":"101","name":"scooter","description":"Small 2-wheel scooter","weight":"3.14"},{"id":"102","name":"car battery","description":"12V car battery","weight":"8.1"},{"id":"103","name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":"0.8"},{"id":"104","name":"hammer","description":"12oz carpenter's hammer","weight":"0.75"},{"id":"105","name":"hammer","description":"14oz carpenter's hammer","weight":"0.875"},{"id":"106","name":"hammer","description":null,"weight":"1.0"},{"id":"107","name":"rocks","description":"box of assorted rocks","weight":"5.3"},{"id":"108","name":"jacket","description":"water resistent black wind breaker","weight":"0.1"},{"id":"109","name":"spare tire","description":"24 inch spare tire","weight":"22.2"}],"database":"inventory","es":1589373515000,"id":3,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373515477,"type":"INSERT"}
-{"data":[{"id":"106","name":"hammer","description":"18oz carpenter hammer","weight":"1.0"}],"database":"inventory","es":1589373546000,"id":4,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"description":null}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373546301,"type":"UPDATE"}
-{"data":[{"id":"107","name":"rocks","description":"box of assorted rocks","weight":"5.1"}],"database":"inventory","es":1589373549000,"id":5,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"weight":"5.3"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373549489,"type":"UPDATE"}
-{"data":[{"id":"110","name":"jacket","description":"water resistent white wind breaker","weight":"0.2"}],"database":"inventory","es":1589373552000,"id":6,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373552882,"type":"INSERT"}
-{"data":[{"id":"111","name":"scooter","description":"Big 2-wheel scooter ","weight":"5.18"}],"database":"inventory","es":1589373555000,"id":7,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373555457,"type":"INSERT"}
-{"data":[{"id":"110","name":"jacket","description":"new water resistent white wind breaker","weight":"0.5"}],"database":"inventory","es":1589373558000,"id":8,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"description":"water resistent white wind breaker","weight":"0.2"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373558230,"type":"UPDATE"}
-{"data":[{"id":"111","name":"scooter","description":"Big 2-wheel scooter ","weight":"5.17"}],"database":"inventory","es":1589373560000,"id":9,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"weight":"5.18"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373560798,"type":"UPDATE"}
-{"data":[{"id":"111","name":"scooter","description":"Big 2-wheel scooter ","weight":"5.17"}],"database":"inventory","es":1589373563000,"id":10,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373563798,"type":"DELETE"}
-{"data":[{"id":"101","name":"scooter","description":"Small 2-wheel scooter","weight":"5.17"},{"id":"102","name":"car battery","description":"12V car battery","weight":"5.17"}],"database":"inventory","es":1589373753000,"id":11,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"weight":"3.14"},{"weight":"8.1"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373753939,"type":"UPDATE"}
-{"data":null,"database":"inventory","es":1589373566000,"id":13,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE `xj_`.`user02` (`uid` int(0) NOT NULL,`uname` varchar(255) NULL, PRIMARY KEY (`uid`))","sqlType":null,"table":"user02","ts":1589373566000,"type":"CREATE"}
-{"data":[{"id":"102","name":"car battery","description":"12V car battery","weight":"5.17"},{"id":"103","name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":"0.8"}],"database":"inventory","es":1589374013000,"id":12,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589374013680,"type":"DELETE"}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/complex-schema.json b/flink-formats-kafka/flink-json-debezium/src/test/resources/complex-schema.json
deleted file mode 100644
index 8666428..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/complex-schema.json
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-{
- "$schema": "http://json-schema.org/draft-06/schema#",
- "description": "A representation of a person, company, organization, or place",
- "type": "object",
- "required": [
- "familyName",
- "givenName"
- ],
- "properties": {
- "fn": {
- "description": "Formatted Name",
- "type": "string"
- },
- "familyName": {
- "type": "string"
- },
- "additionalName": {
- "type": "boolean"
- },
- "tuples": {
- "type": "array",
- "items": [
- {
- "type": "number"
- },
- {
- "type": "string"
- },
- {
- "type": "string",
- "enum": [
- "Street",
- "Avenue",
- "Boulevard"
- ]
- },
- {
- "type": "string",
- "enum": [
- "NW",
- "NE",
- "SW",
- "SE"
- ]
- }
- ],
- "additionalItems": false
- },
- "honorificPrefix": {
- "type": "array",
- "items": {
- "type": "string"
- }
- },
- "url": {
- "type": "string",
- "format": "uri"
- },
- "email": {
- "type": "object",
- "properties": {
- "type": {
- "type": "string"
- },
- "value": {
- "type": "string",
- "format": "email"
- }
- }
- },
- tel: {
- "type": "object",
- "properties": {
- "type": {
- "type": "integer"
- },
- "value": {
- "type": "string",
- "format": "phone"
- }
- }
- },
- "sound": {
- "type": "null"
- },
- "org": {
- "type": "object",
- "properties": {
- "organizationUnit": {
- "type": "object",
- "properties": {}
- }
- }
- }
- }
-}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-data-schema-exclude.txt b/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-data-schema-exclude.txt
deleted file mode 100644
index 3763369..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-data-schema-exclude.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}
-{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
-{"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.800000011920929},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
-{"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
-{"before":null,"after":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
-{"before":null,"after":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
-{"before":null,"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
-{"before":null,"after":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.10000000149011612},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
-{"before":null,"after":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
-{"before":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"after":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589361987000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":362,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589361987936,"transaction":null}
-{"before":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.099999904632568},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362099000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362099505,"transaction":null}
-{"before":null,"after":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362210000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1068,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362210230,"transaction":null}
-{"before":null,"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362243000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1394,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362243428,"transaction":null}
-{"before":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"after":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362293000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1707,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362293539,"transaction":null}
-{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362330000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2090,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362330904,"transaction":null}
-{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"after":null,"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362344000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2443,"row":0,"thread":2,"query":null},"op":"d","ts_ms":1589362344455,"transaction":null}
\ No newline at end of file
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-data-schema-include.txt b/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-data-schema-include.txt
deleted file mode 100644
index b3e0f7d..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-data-schema-include.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.800000011920929},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.10000000149011612},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0},"after":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1.0},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589361987000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":362,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589361987936,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.099999904632568},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362099000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362099505,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362210000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1068,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362210230,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362243000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1394,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362243428,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"after":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362293000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1707,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362293539,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362330000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2090,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362330904,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"after":null,"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362344000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2443,"row":0,"thread":2,"query":null},"op":"d","ts_ms":1589362344455,"transaction":null}}
\ No newline at end of file
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-postgres-data-replica-identity.txt b/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-postgres-data-replica-identity.txt
deleted file mode 100644
index d4fcb88..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-postgres-data-replica-identity.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099434,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099434,"transaction":null}
-{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099435,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099435,"transaction":null}
-{"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099435,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099435,"transaction":null}
-{"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099436,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099436,"transaction":null}
-{"before":null,"after":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099436,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099436,"transaction":null}
-{"before":null,"after":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099436,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099436,"transaction":null}
-{"before":null,"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099437,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099437,"transaction":null}
-{"before":null,"after":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.1},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099437,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099437,"transaction":null}
-{"before":null,"after":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.2},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099437,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099438,"transaction":null}
-{"before":null,"after":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1.0},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010889629,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":602,"lsn":34131104,"xmin":null},"op":"u","ts_ms":1596010890411,"transaction":null}
-{"before":null,"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.1},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010930407,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":603,"lsn":34132200,"xmin":null},"op":"u","ts_ms":1596010930623,"transaction":null}
-{"before":null,"after":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010946488,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":604,"lsn":34132560,"xmin":null},"op":"c","ts_ms":1596010946870,"transaction":null}
-{"before":null,"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010976756,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":605,"lsn":34133072,"xmin":null},"op":"c","ts_ms":1596010976880,"transaction":null}
-{"before":null,"after":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010982228,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":606,"lsn":34133344,"xmin":null},"op":"u","ts_ms":1596010982481,"transaction":null}
-{"before":null,"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010985627,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":607,"lsn":34133584,"xmin":null},"op":"u","ts_ms":1596010986047,"transaction":null}
-{"before":null,"after":null,"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010988168,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":608,"lsn":34133800,"xmin":null},"op":"d","ts_ms":1596010988596,"transaction":null}
\ No newline at end of file
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-postgres-data-schema-exclude.txt b/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-postgres-data-schema-exclude.txt
deleted file mode 100644
index 993f5f4..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-postgres-data-schema-exclude.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099434,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099434,"transaction":null}
-{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099435,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099435,"transaction":null}
-{"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099435,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099435,"transaction":null}
-{"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099436,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099436,"transaction":null}
-{"before":null,"after":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099436,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099436,"transaction":null}
-{"before":null,"after":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099436,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099436,"transaction":null}
-{"before":null,"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099437,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099437,"transaction":null}
-{"before":null,"after":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.1},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099437,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099437,"transaction":null}
-{"before":null,"after":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.2},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099437,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099438,"transaction":null}
-{"before":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0},"after":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1.0},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010889629,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":602,"lsn":34131104,"xmin":null},"op":"u","ts_ms":1596010890411,"transaction":null}
-{"before":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3},"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.1},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010930407,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":603,"lsn":34132200,"xmin":null},"op":"u","ts_ms":1596010930623,"transaction":null}
-{"before":null,"after":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010946488,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":604,"lsn":34132560,"xmin":null},"op":"c","ts_ms":1596010946870,"transaction":null}
-{"before":null,"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010976756,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":605,"lsn":34133072,"xmin":null},"op":"c","ts_ms":1596010976880,"transaction":null}
-{"before":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2},"after":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010982228,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":606,"lsn":34133344,"xmin":null},"op":"u","ts_ms":1596010982481,"transaction":null}
-{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18},"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010985627,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":607,"lsn":34133584,"xmin":null},"op":"u","ts_ms":1596010986047,"transaction":null}
-{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17},"after":null,"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010988168,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":608,"lsn":34133800,"xmin":null},"op":"d","ts_ms":1596010988596,"transaction":null}
\ No newline at end of file
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-postgres-data-schema-include.txt b/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-postgres-data-schema-include.txt
deleted file mode 100644
index 8301935..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/debezium-postgres-data-schema-include.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099434,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099434,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099435,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099435,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099435,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099435,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099436,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099436,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099436,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099436,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099436,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099436,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099437,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099437,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.1},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099437,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099437,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.2},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596001099437,"snapshot":"true","db":"postgres","schema":"inventory","table":"products","txId":601,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1596001099438,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0},"after":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1.0},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010889629,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":602,"lsn":34131104,"xmin":null},"op":"u","ts_ms":1596010890411,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3},"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.1},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010930407,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":603,"lsn":34132200,"xmin":null},"op":"u","ts_ms":1596010930623,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010946488,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":604,"lsn":34132560,"xmin":null},"op":"c","ts_ms":1596010946870,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010976756,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":605,"lsn":34133072,"xmin":null},"op":"c","ts_ms":1596010976880,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2},"after":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010982228,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":606,"lsn":34133344,"xmin":null},"op":"u","ts_ms":1596010982481,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18},"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17},"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010985627,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":607,"lsn":34133584,"xmin":null},"op":"u","ts_ms":1596010986047,"transaction":null}}
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"fullfillment.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"fullfillment.inventory.products.Envelope"},"payload":{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17},"after":null,"source":{"version":"1.2.1.Final","connector":"postgresql","name":"fullfillment","ts_ms":1596010988168,"snapshot":"false","db":"postgres","schema":"inventory","table":"products","txId":608,"lsn":34133800,"xmin":null},"op":"d","ts_ms":1596010988596,"transaction":null}}
\ No newline at end of file
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/maxwell-data.txt b/flink-formats-kafka/flink-json-debezium/src/test/resources/maxwell-data.txt
deleted file mode 100644
index 2d33ff7..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/maxwell-data.txt
+++ /dev/null
@@ -1,20 +0,0 @@
-{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":1,"data":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":4,"data":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":5,"data":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":6,"data":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":7,"data":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.1},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"commit":true,"data":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.2},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"update","ts":1596684893,"xid":7152,"commit":true,"data":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1.0},"old":{"description":"16oz carpenter's hammer"},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"update","ts":1596684897,"xid":7169,"commit":true,"data":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.1},"old":{"weight":5.3},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"insert","ts":1596684900,"xid":7186,"commit":true,"data":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"insert","ts":1596684904,"xid":7201,"commit":true,"data":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"update","ts":1596684906,"xid":7216,"commit":true,"data":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"old":{"description":"water resistent white wind breaker","weight":0.2},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"update","ts":1596684912,"xid":7235,"commit":true,"data":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17},"old":{"weight":5.18},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"delete","ts":1596684914,"xid":7250,"commit":true,"data":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"update","ts":1596684928,"xid":7291,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":5.17},"old":{"weight":3.14},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"update","ts":1596684928,"xid":7291,"commit":true,"data":{"id":102,"name":"car battery","description":"12V car battery","weight":5.17},"old":{"weight":8.1},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"delete","ts":1596684938,"xid":7322,"xoffset":0,"data":{"id":102,"name":"car battery","description":"12V car battery","weight":5.17},"primary_key_columns": ["id"]}
-{"database":"test","table":"product","type":"delete","ts":1596684938,"xid":7322,"commit":true,"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8},"primary_key_columns": ["id"]}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/ogg-data.txt b/flink-formats-kafka/flink-json-debezium/src/test/resources/ogg-data.txt
deleted file mode 100644
index d295925..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/ogg-data.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000143","primary_keys":["id"],"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"op_type":"I", "current_ts":"2020-05-13T13:39:35.766000", "op_ts":"2020-05-13 15:40:06.000000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000144","primary_keys":["id"],"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000145","primary_keys":["id"],"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.800000011920929},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000146","primary_keys":["id"],"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000147","primary_keys":["id"],"after":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000148","primary_keys":["id"],"after":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000149","primary_keys":["id"],"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000150","primary_keys":["id"],"after":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.10000000149011612},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000151","primary_keys":["id"],"after":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000152","primary_keys":["id"],"before":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"after":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1},"op_type":"U","op_ts":"2020-05-13 17:26:27.936000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000153","primary_keys":["id"],"before":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.099999904632568},"op_type":"U","op_ts":"2020-05-13 17:28:19.505000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000154","primary_keys":["id"],"after":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","op_ts":1589362210000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1068,"row":0,"thread":2,"query":null},"op_type":"I","op_ts":"2020-05-13 17:30:10.230000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000155","primary_keys":["id"],"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"op_type":"I","op_ts":"2020-05-13 17:30:43.428000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000156","primary_keys":["id"],"before":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"after":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"op_type":"U","op_ts":"2020-05-13 17:32:20.327000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000157","primary_keys":["id"],"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"op_type":"U","op_ts":"2020-05-13 17:32:10.904000"}
-{"table":"OGG.TBL_TEST","pos":"00000000000000000000158","primary_keys":["id"],"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"after":null,"op_type":"D","op_ts":"2020-05-13 17:32:24.455000"}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/resources/reference-schema.json b/flink-formats-kafka/flink-json-debezium/src/test/resources/reference-schema.json
deleted file mode 100644
index 99e0e79..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/test/resources/reference-schema.json
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-{
- "$schema": "http://json-schema.org/draft-04/schema#",
- "definitions": {
- "address": {
- "type": "object",
- "properties": {
- "street_address": {
- "type": "string"
- },
- "city": {
- "type": "string"
- },
- "state": {
- "type": "string"
- }
- },
- "required": [
- "street_address",
- "city",
- "state"
- ]
- }
- },
- "type": "object",
- "properties": {
- "billing_address": {
- "$ref": "#/definitions/address"
- },
- "shipping_address": {
- "$ref": "#/definitions/address"
- },
- "optional_address": {
- "oneOf": [
- {
- "type": "null"
- },
- {
- "$ref": "#/definitions/address"
- }
- ]
- }
- }
-}
diff --git a/flink-formats-kafka/pom.xml b/flink-formats-kafka/pom.xml
index dbe67f5..8ab527e 100644
--- a/flink-formats-kafka/pom.xml
+++ b/flink-formats-kafka/pom.xml
@@ -35,7 +35,6 @@
<modules>
<module>flink-avro-confluent-registry</module>
- <module>flink-json-debezium</module>
<module>flink-sql-avro-confluent-registry</module>
</modules>