Add the multi version schema support (#3876)
### Motivation
Fix #3742
In order to decode the message correctly by AVRO schema, we need to know the schema what the message is.
### Modification
- Introduced Schema Reader and Schema Writer for StructSchema.
- Reader is used to decode message
- Writer is used to encode message
- The implementations of StructSchema, provides their schema reader and writer implementations.
- Introduced a schema reader cache for caching the readers for different schema versions.
diff --git a/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java
index 984f658..58388ef 100644
--- a/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java
+++ b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.io.hbase.sink;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@@ -31,10 +29,12 @@
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.api.Record;
@@ -52,12 +52,15 @@
import java.util.Map;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* hbase Sink test
*/
@Slf4j
public class HbaseGenericRecordSinkTest {
+ private Message<GenericRecord> message;
+
/**
* A Simple class to test hbase class
@@ -84,6 +87,9 @@
@Test(enabled = false)
public void TestOpenAndWriteSink() throws Exception {
+ message = mock(MessageImpl.class);
+ GenericSchema<GenericRecord> genericAvroSchema;
+
Map<String, Object> map = new HashMap<>();
map.put("zookeeperQuorum", "localhost");
map.put("zookeeperClientPort", "2181");
@@ -112,13 +118,12 @@
AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
byte[] bytes = schema.encode(obj);
- ByteBuf payload = Unpooled.copiedBuffer(bytes);
AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
PulsarSourceConfig pulsarSourceConfig = new PulsarSourceConfig();
Consumer consumer = mock(Consumer.class);
- Message<GenericRecord> message = new MessageImpl("fake_topic_name", "11:111", map, payload, autoConsumeSchema);
+
Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
.message(message)
.topicName("fake_topic_name")
@@ -136,6 +141,11 @@
})
.build();
+ genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+
+ when(message.getValue())
+ .thenReturn(genericAvroSchema.decode(bytes));
+
log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
obj.toString(),
message.getValue().toString(),
diff --git a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
index c95a9ed..fe8941e 100644
--- a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
+++ b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
@@ -27,9 +27,11 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
@@ -57,6 +59,8 @@
@Slf4j
public class InfluxDBGenericRecordSinkTest {
+ private Message<GenericRecord> message;
+
/**
* A Simple class to test InfluxDB class
*/
@@ -107,6 +111,8 @@
@Test
public void testOpenAndWrite() throws Exception {
+ message = mock(MessageImpl.class);
+ GenericSchema<GenericRecord> genericAvroSchema;
// prepare a cpu Record
Cpu cpu = new Cpu();
cpu.setMeasurement("cpu");
@@ -121,17 +127,19 @@
AvroSchema<Cpu> schema = AvroSchema.of(Cpu.class);
byte[] bytes = schema.encode(cpu);
- ByteBuf payload = Unpooled.copiedBuffer(bytes);
AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
- Message<GenericRecord> message = new MessageImpl("influx_cpu", "77:777",
- configMap, payload, autoConsumeSchema);
Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
.message(message)
.topicName("influx_cpu")
.build();
+ genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+
+ when(message.getValue())
+ .thenReturn(genericAvroSchema.decode(bytes));
+
log.info("cpu:{}, Message.getValue: {}, record.getValue: {}",
cpu.toString(),
message.getValue().toString(),
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
index 9ecc91a..aec9d7e 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -20,8 +20,7 @@
package org.apache.pulsar.io.jdbc;
import com.google.common.collect.Maps;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
+
import java.util.Map;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -29,11 +28,11 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
-import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.testng.Assert;
@@ -41,12 +40,16 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* Jdbc Sink test
*/
@Slf4j
public class JdbcSinkTest {
private final SqliteUtils sqliteUtils = new SqliteUtils(getClass().getSimpleName());
+ private Message<GenericRecord> message;
/**
* A Simple class to test jdbc class
@@ -72,9 +75,11 @@
@Test
public void TestOpenAndWriteSink() throws Exception {
+ message = mock(MessageImpl.class);
JdbcAutoSchemaSink jdbcSink;
Map<String, Object> conf;
String tableName = "TestOpenAndWriteSink";
+ GenericSchema<GenericRecord> genericAvroSchema;
String jdbcUrl = sqliteUtils.sqliteUri();
conf = Maps.newHashMap();
@@ -99,16 +104,16 @@
AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
byte[] bytes = schema.encode(obj);
- ByteBuf payload = Unpooled.copiedBuffer(bytes);
- AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
- autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
- Message<GenericRecord> message = new MessageImpl("fake_topic_name", "77:777", conf, payload, autoConsumeSchema);
Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
.message(message)
.topicName("fake_topic_name")
.build();
+ genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+
+ when(message.getValue())
+ .thenReturn(genericAvroSchema.decode(bytes));
log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
obj.toString(),
message.getValue().toString(),
diff --git a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java
index 72462e7..92d1f4f 100644
--- a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java
+++ b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java
@@ -18,17 +18,17 @@
*/
package org.apache.pulsar.io.solr;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
@@ -39,6 +39,9 @@
import java.util.HashMap;
import java.util.Map;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* solr Sink test
*/
@@ -46,6 +49,7 @@
public class SolrGenericRecordSinkTest {
private SolrServerUtil solrServerUtil;
+ private Message<GenericRecord> message;
/**
* A Simple class to test solr class
@@ -71,6 +75,7 @@
@Test
public void TestOpenAndWriteSink() throws Exception {
+ message = mock(MessageImpl.class);
Map<String, Object> configs = new HashMap<>();
configs.put("solrUrl", "http://localhost:8983/solr");
configs.put("solrMode", "Standalone");
@@ -78,6 +83,7 @@
configs.put("solrCommitWithinMs", "100");
configs.put("username", "");
configs.put("password", "");
+ GenericSchema<GenericRecord> genericAvroSchema;
SolrGenericRecordSink sink = new SolrGenericRecordSink();
@@ -88,16 +94,19 @@
AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
byte[] bytes = schema.encode(obj);
- ByteBuf payload = Unpooled.copiedBuffer(bytes);
AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
- Message<GenericRecord> message = new MessageImpl("fake_topic_name", "77:777", configs, payload, autoConsumeSchema);
Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
.message(message)
.topicName("fake_topic_name")
.build();
+ genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+
+ when(message.getValue())
+ .thenReturn(genericAvroSchema.decode(bytes));
+
log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
obj.toString(),
message.getValue().toString(),