[Pulsar IO][Issue 5633]Support avro schema for debezium connector (#6034)

Fixes #5633 


### Motivation

Currently, some users want to support Avro schema in debezium, so this pr supports this feature.
For Kafka's Avro schema, it depends on the Avro 1.8 version, but Avro version has just been upgraded to 1.9 in pulsar, so shade is needed to avoid finding `addProp` function

### Modifications

* Add a package `kafka-connect-avro-converter-shaded`
* Add class KafkaSchema to converter Kafka's Avro schema to pulsar's schema

### Verifying this change 

Unit test and integration tests
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
index 7305849..a66f635 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
@@ -29,7 +29,6 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.utils.DockerUtils;
 import org.testcontainers.containers.GenericContainer;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 70e9f45..07262ee 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -33,6 +33,7 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -42,8 +43,10 @@
 import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
 import org.apache.pulsar.functions.api.examples.AvroSchemaTestFunction;
@@ -63,15 +66,18 @@
 import org.apache.pulsar.tests.integration.utils.DockerUtils;
 import org.assertj.core.api.Assertions;
 import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.shaded.com.google.common.collect.Sets;
 import org.testng.annotations.Test;
 import org.testng.collections.Maps;
 
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -81,6 +87,7 @@
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -140,18 +147,24 @@
     }
 
     @Test(groups = "source")
-    public void testDebeziumMySqlSource() throws Exception {
-        testDebeziumMySqlConnect();
+    public void testDebeziumMySqlSourceJson() throws Exception {
+        testDebeziumMySqlConnect("org.apache.kafka.connect.json.JsonConverter", true);
+    }
+
+    @Test(groups = "source")
+    public void testDebeziumMySqlSourceAvro() throws Exception {
+        testDebeziumMySqlConnect(
+                "org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter", false);
     }
 
     @Test(groups = "source")
     public void testDebeziumPostgreSqlSource() throws Exception {
-        testDebeziumPostgreSqlConnect();
+        testDebeziumPostgreSqlConnect("org.apache.kafka.connect.json.JsonConverter", true);
     }
 
     @Test(groups = "source")
     public void testDebeziumMongoDbSource() throws Exception{
-        testDebeziumMongoDbConnect();
+        testDebeziumMongoDbConnect("org.apache.kafka.connect.json.JsonConverter", true);
     }
 
     private void testSink(SinkTester tester, boolean builtin) throws Exception {
@@ -2262,15 +2275,17 @@
         getFunctionInfoNotFound(functionName);
     }
 
-    private  void testDebeziumMySqlConnect()
-        throws Exception {
+    private void testDebeziumMySqlConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {
 
         final String tenant = TopicName.PUBLIC_TENANT;
         final String namespace = TopicName.DEFAULT_NAMESPACE;
         final String outputTopicName = "debe-output-topic-name";
-        final String consumeTopicName = "public/default/dbserver1.inventory.products";
-        final String sourceName = "test-source-connector-"
-            + functionRuntimeType + "-name-" + randomName(8);
+        boolean isJsonConverter = converterClassName.endsWith("JsonConverter");
+        final String consumeTopicName = "debezium/mysql-"
+                + (isJsonConverter ? "json" : "avro")
+                + "/dbserver1.inventory.products";
+        final String sourceName = "test-source-debezium-mysql" + (isJsonConverter ? "json" : "avro")
+                + "-" + functionRuntimeType + "-" + randomName(8);
 
         // This is the binlog count that contained in mysql container.
         final int numMessages = 47;
@@ -2287,28 +2302,21 @@
 
         @Cleanup
         PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
+        initNamespace(admin);
+
         try {
-            // If topic already exists, we should delete it so as not to affect the following tests.
-            admin.topics().getStats(consumeTopicName);
-            admin.topics().delete(consumeTopicName);
-            admin.schemas().deleteSchema(consumeTopicName);
-        } catch (PulsarAdminException e) {
-            // Expected results, ignoring the exception
-            log.info("Topic: {} does not exist, we can continue the following tests. Exceptions message: {}",
+            SchemaInfo lastSchemaInfo = admin.schemas().getSchemaInfo(consumeTopicName);
+            log.info("lastSchemaInfo: {}", lastSchemaInfo == null ? "null" : lastSchemaInfo.toString());
+        } catch (Exception e) {
+            log.warn("failed to get schemaInfo for topic: {}, exceptions message: {}",
                     consumeTopicName, e.getMessage());
         }
-        admin.topics().createNonPartitionedTopic(consumeTopicName);
+
         admin.topics().createNonPartitionedTopic(outputTopicName);
 
         @Cleanup
-        Consumer<KeyValue<byte[], byte[]>> consumer = client.newConsumer(KeyValueSchema.kvBytes())
-            .topic(consumeTopicName)
-            .subscriptionName("debezium-source-tester")
-            .subscriptionType(SubscriptionType.Exclusive)
-            .subscribe();
-
-        @Cleanup
-        DebeziumMySqlSourceTester sourceTester = new DebeziumMySqlSourceTester(pulsarCluster);
+        DebeziumMySqlSourceTester sourceTester = new DebeziumMySqlSourceTester(pulsarCluster, converterClassName);
+        sourceTester.getSourceConfig().put("json-with-envelope", jsonWithEnvelope);
 
         // setup debezium mysql server
         DebeziumMySQLContainer mySQLContainer = new DebeziumMySQLContainer(pulsarCluster.getClusterName());
@@ -2330,26 +2338,35 @@
         Failsafe.with(statusRetryPolicy).run(() ->
                 waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages));
 
+        @Cleanup
+        Consumer consumer = client.newConsumer(getSchema(jsonWithEnvelope))
+                .topic(consumeTopicName)
+                .subscriptionName("debezium-source-tester")
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+        log.info("[debezium mysql test] create consumer finish. converterName: {}", converterClassName);
+
         // validate the source result
-        sourceTester.validateSourceResult(consumer, 9, null);
+        sourceTester.validateSourceResult(consumer, 9, null, converterClassName);
 
         // prepare insert event
         sourceTester.prepareInsertEvent();
 
         // validate the source insert event
-        sourceTester.validateSourceResult(consumer, 1, SourceTester.INSERT);
+        sourceTester.validateSourceResult(consumer, 1, SourceTester.INSERT, converterClassName);
 
         // prepare update event
         sourceTester.prepareUpdateEvent();
 
         // validate the source update event
-        sourceTester.validateSourceResult(consumer, 1, SourceTester.UPDATE);
+        sourceTester.validateSourceResult(consumer, 1, SourceTester.UPDATE, converterClassName);
 
         // prepare delete event
         sourceTester.prepareDeleteEvent();
 
         // validate the source delete event
-        sourceTester.validateSourceResult(consumer, 1, SourceTester.DELETE);
+        sourceTester.validateSourceResult(consumer, 1, SourceTester.DELETE, converterClassName);
 
         // delete the source
         deleteSource(tenant, namespace, sourceName);
@@ -2358,14 +2375,14 @@
         getSourceInfoNotFound(tenant, namespace, sourceName);
     }
 
-    private  void testDebeziumPostgreSqlConnect() throws Exception {
+    private  void testDebeziumPostgreSqlConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {
 
         final String tenant = TopicName.PUBLIC_TENANT;
         final String namespace = TopicName.DEFAULT_NAMESPACE;
         final String outputTopicName = "debe-output-topic-name";
-        final String consumeTopicName = "public/default/dbserver1.inventory.products";
-        final String sourceName = "test-source-connector-"
-                + functionRuntimeType + "-name-" + randomName(8);
+        final String consumeTopicName = "debezium/postgresql/dbserver1.inventory.products";
+        final String sourceName = "test-source-debezium-postgersql-" + functionRuntimeType + "-" + randomName(8);
+
 
         // This is the binlog count that contained in postgresql container.
         final int numMessages = 26;
@@ -2382,21 +2399,13 @@
 
         @Cleanup
         PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
-        try {
-            // If topic already exists, we should delete it so as not to affect the following tests.
-            admin.topics().getStats(consumeTopicName);
-            admin.topics().delete(consumeTopicName);
-            admin.schemas().deleteSchema(consumeTopicName);
-        } catch (PulsarAdminException e) {
-            // Expected results, ignoring the exception
-            log.info("Topic: {} does not exist, we can continue the following tests. Exceptions message: {}",
-                    consumeTopicName, e.getMessage());
-        }
+        initNamespace(admin);
+
         admin.topics().createNonPartitionedTopic(consumeTopicName);
         admin.topics().createNonPartitionedTopic(outputTopicName);
 
         @Cleanup
-        Consumer<KeyValue<byte[], byte[]>> consumer = client.newConsumer(KeyValueSchema.kvBytes())
+        Consumer consumer = client.newConsumer(getSchema(jsonWithEnvelope))
                 .topic(consumeTopicName)
                 .subscriptionName("debezium-source-tester")
                 .subscriptionType(SubscriptionType.Exclusive)
@@ -2404,6 +2413,7 @@
 
         @Cleanup
         DebeziumPostgreSqlSourceTester sourceTester = new DebeziumPostgreSqlSourceTester(pulsarCluster);
+        sourceTester.getSourceConfig().put("json-with-envelope", jsonWithEnvelope);
 
         // setup debezium postgresql server
         DebeziumPostgreSqlContainer postgreSqlContainer = new DebeziumPostgreSqlContainer(pulsarCluster.getClusterName());
@@ -2426,25 +2436,25 @@
                 waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages));
 
         // validate the source result
-        sourceTester.validateSourceResult(consumer, 9, null);
+        sourceTester.validateSourceResult(consumer, 9, null, converterClassName);
 
         // prepare insert event
         sourceTester.prepareInsertEvent();
 
         // validate the source insert event
-        sourceTester.validateSourceResult(consumer, 1, SourceTester.INSERT);
+        sourceTester.validateSourceResult(consumer, 1, SourceTester.INSERT, converterClassName);
 
         // prepare update event
         sourceTester.prepareUpdateEvent();
 
         // validate the source update event
-        sourceTester.validateSourceResult(consumer, 1, SourceTester.UPDATE);
+        sourceTester.validateSourceResult(consumer, 1, SourceTester.UPDATE, converterClassName);
 
         // prepare delete event
         sourceTester.prepareDeleteEvent();
 
         // validate the source delete event
-        sourceTester.validateSourceResult(consumer, 1, SourceTester.DELETE);
+        sourceTester.validateSourceResult(consumer, 1, SourceTester.DELETE, converterClassName);
 
         // delete the source
         deleteSource(tenant, namespace, sourceName);
@@ -2453,12 +2463,12 @@
         getSourceInfoNotFound(tenant, namespace, sourceName);
     }
 
-    private  void testDebeziumMongoDbConnect() throws Exception {
+    private  void testDebeziumMongoDbConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {
 
         final String tenant = TopicName.PUBLIC_TENANT;
         final String namespace = TopicName.DEFAULT_NAMESPACE;
         final String outputTopicName = "debe-output-topic-name";
-        final String consumeTopicName = "public/default/dbserver1.inventory.products";
+        final String consumeTopicName = "debezium/mongodb/dbserver1.inventory.products";
         final String sourceName = "test-source-connector-"
                 + functionRuntimeType + "-name-" + randomName(8);
 
@@ -2477,21 +2487,13 @@
 
         @Cleanup
         PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
-        try {
-            // If topic already exists, we should delete it so as not to affect the following tests.
-            admin.topics().getStats(consumeTopicName);
-            admin.topics().delete(consumeTopicName);
-            admin.schemas().deleteSchema(consumeTopicName);
-        } catch (PulsarAdminException e) {
-            // Expected results, ignoring the exception
-            log.info("Topic: {} does not exist, we can continue the following tests. Exceptions message: {}",
-                    consumeTopicName, e.getMessage());
-        }
+        initNamespace(admin);
+
         admin.topics().createNonPartitionedTopic(consumeTopicName);
         admin.topics().createNonPartitionedTopic(outputTopicName);
 
         @Cleanup
-        Consumer<KeyValue<byte[], byte[]>> consumer = client.newConsumer(KeyValueSchema.kvBytes())
+        Consumer consumer = client.newConsumer(getSchema(jsonWithEnvelope))
                 .topic(consumeTopicName)
                 .subscriptionName("debezium-source-tester")
                 .subscriptionType(SubscriptionType.Exclusive)
@@ -2499,6 +2501,7 @@
 
         @Cleanup
         DebeziumMongoDbSourceTester sourceTester = new DebeziumMongoDbSourceTester(pulsarCluster);
+        sourceTester.getSourceConfig().put("json-with-envelope", jsonWithEnvelope);
 
         // setup debezium mongodb server
         DebeziumMongoDbContainer mongoDbContainer = new DebeziumMongoDbContainer(pulsarCluster.getClusterName());
@@ -2520,25 +2523,25 @@
                 waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages));
 
         // validate the source result
-        sourceTester.validateSourceResult(consumer, 9, null);
+        sourceTester.validateSourceResult(consumer, 9, null, converterClassName);
 
         // prepare insert event
         sourceTester.prepareInsertEvent();
 
         // validate the source insert event
-        sourceTester.validateSourceResult(consumer, 1, SourceTester.INSERT);
+        sourceTester.validateSourceResult(consumer, 1, SourceTester.INSERT, converterClassName);
 
         // prepare update event
         sourceTester.prepareUpdateEvent();
 
         // validate the source update event
-        sourceTester.validateSourceResult(consumer, 1, SourceTester.UPDATE);
+        sourceTester.validateSourceResult(consumer, 1, SourceTester.UPDATE, converterClassName);
 
         // prepare delete event
         sourceTester.prepareDeleteEvent();
 
         // validate the source delete event
-        sourceTester.validateSourceResult(consumer, 1, SourceTester.DELETE);
+        sourceTester.validateSourceResult(consumer, 1, SourceTester.DELETE, converterClassName);
 
         // delete the source
         deleteSource(tenant, namespace, sourceName);
@@ -2547,4 +2550,27 @@
         getSourceInfoNotFound(tenant, namespace, sourceName);
     }
 
+    private void initNamespace(PulsarAdmin admin) {
+        log.info("[initNamespace] start.");
+        try {
+            admin.tenants().createTenant("debezium", new TenantInfo(Sets.newHashSet(),
+                    Sets.newHashSet(pulsarCluster.getClusterName())));
+            admin.namespaces().createNamespace("debezium/mysql-json");
+            admin.namespaces().createNamespace("debezium/mysql-avro");
+            admin.namespaces().createNamespace("debezium/mongodb");
+            admin.namespaces().createNamespace("debezium/postgresql");
+        } catch (Exception e) {
+            log.info("[initNamespace] msg: {}", e.getMessage());
+        }
+        log.info("[initNamespace] finish.");
+    }
+
+    private Schema getSchema(boolean jsonWithEnvelope) {
+        if (jsonWithEnvelope) {
+            return KeyValueSchema.kvBytes();
+        } else {
+            return KeyValueSchema.of(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED);
+        }
+    }
+
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMongoDbSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMongoDbSourceTester.java
index 23b5db4..6fa35ef 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMongoDbSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMongoDbSourceTester.java
@@ -50,6 +50,7 @@
         sourceConfig.put("mongodb.task.id","1");
         sourceConfig.put("database.whitelist", "inventory");
         sourceConfig.put("pulsar.service.url", pulsarServiceUrl);
+        sourceConfig.put("topic.namespace", "debezium/mongodb");
     }
 
     @Override
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
index 3287e2b..4b66506 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
@@ -48,7 +48,7 @@
 
     private final PulsarCluster pulsarCluster;
 
-    public DebeziumMySqlSourceTester(PulsarCluster cluster) {
+    public DebeziumMySqlSourceTester(PulsarCluster cluster, String converterClassName) {
         super(NAME);
         this.pulsarCluster = cluster;
         pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT;
@@ -61,6 +61,10 @@
         sourceConfig.put("database.server.name", "dbserver1");
         sourceConfig.put("database.whitelist", "inventory");
         sourceConfig.put("pulsar.service.url", pulsarServiceUrl);
+        sourceConfig.put("key.converter", converterClassName);
+        sourceConfig.put("value.converter", converterClassName);
+        sourceConfig.put("topic.namespace", "debezium/mysql-" +
+                (converterClassName.endsWith("AvroConverter") ? "avro" : "json"));
     }
 
     @Override
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java
index e0efff2..a8deb4c 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java
@@ -63,6 +63,7 @@
         sourceConfig.put("schema.whitelist", "inventory");
         sourceConfig.put("table.blacklist", "inventory.spatial_ref_sys,inventory.geom");
         sourceConfig.put("pulsar.service.url", pulsarServiceUrl);
+        sourceConfig.put("topic.namespace", "debezium/postgresql");
     }
 
     @Override
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
index 5040de5..27037ab 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
@@ -18,13 +18,17 @@
  */
 package org.apache.pulsar.tests.integration.io;
 
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.testcontainers.containers.GenericContainer;
 import org.testng.Assert;
@@ -46,6 +50,14 @@
     protected final String sourceType;
     protected final Map<String, Object> sourceConfig;
 
+    public final static Set<String> DEBEZIUM_FIELD_SET = new HashSet<String>() {{
+        add("before");
+        add("after");
+        add("source");
+        add("op");
+        add("ts_ms");
+    }};
+
     protected SourceTester(String sourceType) {
         this.sourceType = sourceType;
         this.sourceConfig = Maps.newHashMap();
@@ -71,7 +83,16 @@
 
     public abstract Map<String, String> produceSourceMessages(int numMessages) throws Exception;
 
-    public void validateSourceResult(Consumer<KeyValue<byte[], byte[]>> consumer, int number, String eventType) throws Exception {
+    public void validateSourceResult(Consumer consumer, int number,
+                                     String eventType, String converterClassName) throws Exception {
+        if (converterClassName.endsWith("AvroConverter")) {
+            validateSourceResultAvro(consumer, number, eventType);
+        } else {
+            validateSourceResultJson(consumer, number, eventType);
+        }
+    }
+
+    public void validateSourceResultJson(Consumer<KeyValue<byte[], byte[]>> consumer, int number, String eventType) throws Exception {
         int recordsNumber = 0;
         Message<KeyValue<byte[], byte[]>> msg = consumer.receive(2, TimeUnit.SECONDS);
         while(msg != null) {
@@ -82,7 +103,7 @@
             Assert.assertTrue(key.contains(this.keyContains()));
             Assert.assertTrue(value.contains(this.valueContains()));
             if (eventType != null) {
-                Assert.assertTrue(value.contains(this.eventContains(eventType)));
+                Assert.assertTrue(value.contains(this.eventContains(eventType, true)));
             }
             consumer.acknowledge(msg);
             msg = consumer.receive(1, TimeUnit.SECONDS);
@@ -91,20 +112,51 @@
         Assert.assertEquals(recordsNumber, number);
         log.info("Stop {} server container. topic: {} has {} records.", getSourceType(), consumer.getTopic(), recordsNumber);
     }
+
+    public void validateSourceResultAvro(Consumer<KeyValue<GenericRecord, GenericRecord>> consumer,
+                                     int number, String eventType) throws Exception {
+        int recordsNumber = 0;
+        Message<KeyValue<GenericRecord, GenericRecord>> msg = consumer.receive(2, TimeUnit.SECONDS);
+        while(msg != null) {
+            recordsNumber ++;
+            GenericRecord keyRecord = msg.getValue().getKey();
+            Assert.assertNotNull(keyRecord.getFields());
+            Assert.assertTrue(keyRecord.getFields().size() > 0);
+
+            GenericRecord valueRecord = msg.getValue().getValue();
+            Assert.assertNotNull(valueRecord.getFields());
+            Assert.assertTrue(valueRecord.getFields().size() > 0);
+            for (Field field : valueRecord.getFields()) {
+                Assert.assertTrue(DEBEZIUM_FIELD_SET.contains(field.getName()));
+            }
+
+            if (eventType != null) {
+                String op = valueRecord.getField("op").toString();
+                Assert.assertEquals(this.eventContains(eventType, false), op);
+            }
+            consumer.acknowledge(msg);
+            msg = consumer.receive(1, TimeUnit.SECONDS);
+        }
+
+        Assert.assertEquals(recordsNumber, number);
+        log.info("Stop {} server container. topic: {} has {} records.", getSourceType(), consumer.getTopic(), recordsNumber);
+    }
+
     public String keyContains(){
         return "dbserver1.inventory.products.Key";
     }
+
     public String valueContains(){
         return "dbserver1.inventory.products.Value";
     }
 
-    public String eventContains(String eventType) {
+    public String eventContains(String eventType, boolean isJson) {
         if (eventType.equals(INSERT)) {
-            return "\"op\":\"c\"";
+            return isJson ? "\"op\":\"c\"" : "c";
         } else if (eventType.equals(UPDATE)) {
-            return "\"op\":\"u\"";
+            return isJson ? "\"op\":\"u\"" : "u";
         } else {
-            return "\"op\":\"d\"";
+            return isJson ? "\"op\":\"d\"" : "d";
         }
     }
 }