Add the multi version schema support (#3876)

### Motivation

Fix #3742

In order to decode the message correctly by AVRO schema, we need to know the schema what the message is.

### Modification

- Introduced Schema Reader and Schema Writer for StructSchema.
   - Reader is used to decode message
   - Writer is used to encode message
- The implementations of StructSchema, provides their schema reader and writer implementations. 
- Introduced a schema reader cache for caching the readers for different schema versions. 
diff --git a/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java
index 984f658..58388ef 100644
--- a/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java
+++ b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.io.hbase.sink;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
@@ -31,10 +29,12 @@
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.functions.api.Record;
@@ -52,12 +52,15 @@
 import java.util.Map;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * hbase Sink test
  */
 @Slf4j
 public class HbaseGenericRecordSinkTest {
+    private Message<GenericRecord> message;
+
 
     /**
      * A Simple class to test hbase class
@@ -84,6 +87,9 @@
 
     @Test(enabled = false)
     public void TestOpenAndWriteSink() throws Exception {
+        message = mock(MessageImpl.class);
+        GenericSchema<GenericRecord> genericAvroSchema;
+
         Map<String, Object> map = new HashMap<>();
         map.put("zookeeperQuorum", "localhost");
         map.put("zookeeperClientPort", "2181");
@@ -112,13 +118,12 @@
         AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
 
         byte[] bytes = schema.encode(obj);
-        ByteBuf payload = Unpooled.copiedBuffer(bytes);
         AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
         autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
 
         PulsarSourceConfig pulsarSourceConfig = new PulsarSourceConfig();
         Consumer consumer = mock(Consumer.class);
-        Message<GenericRecord> message = new MessageImpl("fake_topic_name", "11:111", map, payload, autoConsumeSchema);
+
         Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
             .message(message)
             .topicName("fake_topic_name")
@@ -136,6 +141,11 @@
             })
             .build();
 
+        genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+
+        when(message.getValue())
+                .thenReturn(genericAvroSchema.decode(bytes));
+
         log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
                 obj.toString(),
                 message.getValue().toString(),
diff --git a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
index c95a9ed..fe8941e 100644
--- a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
+++ b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
@@ -27,9 +27,11 @@
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.source.PulsarRecord;
@@ -57,6 +59,8 @@
 @Slf4j
 public class InfluxDBGenericRecordSinkTest {
 
+    private Message<GenericRecord> message;
+
     /**
      * A Simple class to test InfluxDB class
      */
@@ -107,6 +111,8 @@
 
     @Test
     public void testOpenAndWrite() throws Exception {
+        message = mock(MessageImpl.class);
+        GenericSchema<GenericRecord> genericAvroSchema;
         // prepare a cpu Record
         Cpu cpu = new Cpu();
         cpu.setMeasurement("cpu");
@@ -121,17 +127,19 @@
         AvroSchema<Cpu> schema = AvroSchema.of(Cpu.class);
 
         byte[] bytes = schema.encode(cpu);
-        ByteBuf payload = Unpooled.copiedBuffer(bytes);
         AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
         autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
 
-        Message<GenericRecord> message = new MessageImpl("influx_cpu", "77:777",
-            configMap, payload, autoConsumeSchema);
         Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
             .message(message)
             .topicName("influx_cpu")
             .build();
 
+        genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+
+        when(message.getValue())
+                .thenReturn(genericAvroSchema.decode(bytes));
+
         log.info("cpu:{}, Message.getValue: {}, record.getValue: {}",
             cpu.toString(),
             message.getValue().toString(),
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
index 9ecc91a..aec9d7e 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -20,8 +20,7 @@
 package org.apache.pulsar.io.jdbc;
 
 import com.google.common.collect.Maps;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
+
 import java.util.Map;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
@@ -29,11 +28,11 @@
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
-import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.testng.Assert;
@@ -41,12 +40,16 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 /**
  * Jdbc Sink test
  */
 @Slf4j
 public class JdbcSinkTest {
     private final SqliteUtils sqliteUtils = new SqliteUtils(getClass().getSimpleName());
+    private Message<GenericRecord> message;
 
     /**
      * A Simple class to test jdbc class
@@ -72,9 +75,11 @@
 
     @Test
     public void TestOpenAndWriteSink() throws Exception {
+        message = mock(MessageImpl.class);
         JdbcAutoSchemaSink jdbcSink;
         Map<String, Object> conf;
         String tableName = "TestOpenAndWriteSink";
+        GenericSchema<GenericRecord> genericAvroSchema;
 
         String jdbcUrl = sqliteUtils.sqliteUri();
         conf = Maps.newHashMap();
@@ -99,16 +104,16 @@
         AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
 
         byte[] bytes = schema.encode(obj);
-        ByteBuf payload = Unpooled.copiedBuffer(bytes);
-        AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
-        autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
 
-        Message<GenericRecord> message = new MessageImpl("fake_topic_name", "77:777", conf, payload, autoConsumeSchema);
         Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
             .message(message)
             .topicName("fake_topic_name")
             .build();
 
+        genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+
+        when(message.getValue())
+                .thenReturn(genericAvroSchema.decode(bytes));
         log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
             obj.toString(),
             message.getValue().toString(),
diff --git a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java
index 72462e7..92d1f4f 100644
--- a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java
+++ b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java
@@ -18,17 +18,17 @@
  */
 package org.apache.pulsar.io.solr;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.source.PulsarRecord;
@@ -39,6 +39,9 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 /**
  * solr Sink test
  */
@@ -46,6 +49,7 @@
 public class SolrGenericRecordSinkTest {
 
     private SolrServerUtil solrServerUtil;
+    private Message<GenericRecord> message;
 
     /**
      * A Simple class to test solr class
@@ -71,6 +75,7 @@
 
     @Test
     public void TestOpenAndWriteSink() throws Exception {
+        message = mock(MessageImpl.class);
         Map<String, Object> configs = new HashMap<>();
         configs.put("solrUrl", "http://localhost:8983/solr");
         configs.put("solrMode", "Standalone");
@@ -78,6 +83,7 @@
         configs.put("solrCommitWithinMs", "100");
         configs.put("username", "");
         configs.put("password", "");
+        GenericSchema<GenericRecord> genericAvroSchema;
 
         SolrGenericRecordSink sink = new SolrGenericRecordSink();
 
@@ -88,16 +94,19 @@
         AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
 
         byte[] bytes = schema.encode(obj);
-        ByteBuf payload = Unpooled.copiedBuffer(bytes);
         AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
         autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
 
-        Message<GenericRecord> message = new MessageImpl("fake_topic_name", "77:777", configs, payload, autoConsumeSchema);
         Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
             .message(message)
             .topicName("fake_topic_name")
             .build();
 
+        genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+
+        when(message.getValue())
+                .thenReturn(genericAvroSchema.decode(bytes));
+
         log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
             obj.toString(),
             message.getValue().toString(),