feat: support multiple schema version for producer and consumer (#611)
* feat: support multiple schema version for producer and consumer
* Fix format the code and add some description for error
* fix: syntax error
* fix lint and CI error
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 4cc1645..fac9d4b 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -18,6 +18,7 @@
package pulsar
import (
+ "encoding/hex"
"errors"
"fmt"
"math"
@@ -144,11 +145,57 @@
nackTracker *negativeAcksTracker
dlq *dlqRouter
- log log.Logger
-
+ log log.Logger
compressionProviders sync.Map //map[pb.CompressionType]compression.Provider
metrics *internal.LeveledMetrics
decryptor cryptointernal.Decryptor
+ schemaInfoCache *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+ lock sync.RWMutex
+ cache map[string]Schema
+ client *client
+ topic string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+ return &schemaInfoCache{
+ cache: make(map[string]Schema),
+ client: client,
+ topic: topic,
+ }
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) {
+ key := hex.EncodeToString(schemaVersion)
+ s.lock.RLock()
+ schema, ok := s.cache[key]
+ s.lock.RUnlock()
+ if ok {
+ return schema, nil
+ }
+
+ pbSchema, err := s.client.lookupService.GetSchema(s.topic, schemaVersion)
+ if err != nil {
+ return nil, err
+ }
+
+ var properties = internal.ConvertToStringMap(pbSchema.Properties)
+
+ schema, err = NewSchema(SchemaType(*pbSchema.Type), pbSchema.SchemaData, properties)
+ if err != nil {
+ return nil, err
+ }
+ s.add(key, schema)
+ return schema, nil
+}
+
+func (s *schemaInfoCache) add(schemaVersionHash string, schema Schema) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.cache[schemaVersionHash] = schema
}
func newPartitionConsumer(parent Consumer, client *client, options *partitionConsumerOpts,
@@ -175,6 +222,7 @@
compressionProviders: sync.Map{},
dlq: dlq,
metrics: metrics,
+ schemaInfoCache: newSchemaInfoCache(client, options.topic),
}
pc.setConsumerState(consumerInit)
pc.log = client.log.SubLogger(log.Fields{
@@ -687,6 +735,8 @@
replicationClusters: msgMeta.GetReplicateTo(),
replicatedFrom: msgMeta.GetReplicatedFrom(),
redeliveryCount: response.GetRedeliveryCount(),
+ schemaVersion: msgMeta.GetSchemaVersion(),
+ schemaInfoCache: pc.schemaInfoCache,
orderingKey: string(smm.OrderingKey),
index: messageIndex,
brokerPublishTime: brokerPublishTime,
@@ -705,6 +755,8 @@
replicationClusters: msgMeta.GetReplicateTo(),
replicatedFrom: msgMeta.GetReplicatedFrom(),
redeliveryCount: response.GetRedeliveryCount(),
+ schemaVersion: msgMeta.GetSchemaVersion(),
+ schemaInfoCache: pc.schemaInfoCache,
index: messageIndex,
brokerPublishTime: brokerPublishTime,
}
@@ -1119,7 +1171,7 @@
keySharedMeta := toProtoKeySharedMeta(pc.options.keySharedPolicy)
requestID := pc.client.rpcClient.NewRequestID()
- pbSchema := new(pb.Schema)
+ var pbSchema *pb.Schema
if pc.options.schema != nil && pc.options.schema.GetSchemaInfo() != nil {
tmpSchemaType := pb.Schema_Type(int32(pc.options.schema.GetSchemaInfo().Type))
@@ -1131,7 +1183,6 @@
}
pc.log.Debugf("The partition consumer schema name is: %s", pbSchema.Name)
} else {
- pbSchema = nil
pc.log.Debug("The partition consumer schema is nil")
}
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 8248b1a..067439f 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -243,6 +243,8 @@
replicatedFrom string
redeliveryCount uint32
schema Schema
+ schemaVersion []byte
+ schemaInfoCache *schemaInfoCache
encryptionContext *EncryptionContext
index *uint64
brokerPublishTime *time.Time
@@ -293,9 +295,20 @@
}
func (msg *message) GetSchemaValue(v interface{}) error {
+ if msg.schemaVersion != nil {
+ schema, err := msg.schemaInfoCache.Get(msg.schemaVersion)
+ if err != nil {
+ return err
+ }
+ return schema.Decode(msg.payLoad, v)
+ }
return msg.schema.Decode(msg.payLoad, v)
}
+func (msg *message) SchemaVersion() []byte {
+ return msg.schemaVersion
+}
+
func (msg *message) ProducerName() string {
return msg.producerName
}
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index fb7598e..fe8f628 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -18,6 +18,7 @@
package internal
import (
+ "bytes"
"time"
"github.com/gogo/protobuf/proto"
@@ -49,6 +50,7 @@
metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64,
payload []byte,
callback interface{}, replicateTo []string, deliverAt time.Time,
+ schemaVersion []byte, multiSchemaEnabled bool,
) bool
// Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
@@ -165,12 +167,21 @@
return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
}
+func (bc *batchContainer) hasSameSchema(schemaVersion []byte) bool {
+ if bc.numMessages == 0 {
+ return true
+ }
+ return bytes.Equal(bc.msgMetadata.SchemaVersion, schemaVersion)
+}
+
// Add will add single message to batch.
func (bc *batchContainer) Add(
metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64,
payload []byte,
callback interface{}, replicateTo []string, deliverAt time.Time,
+ schemaVersion []byte, multiSchemaEnabled bool,
) bool {
+
if replicateTo != nil && bc.numMessages != 0 {
// If the current batch is not empty and we're trying to set the replication clusters,
// then we need to force the current batch to flush and send the message individually
@@ -182,6 +193,9 @@
} else if !bc.hasSpace(payload) {
// The current batch is full. Producer has to call Flush() to
return false
+ } else if multiSchemaEnabled && !bc.hasSameSchema(schemaVersion) {
+ // The current batch has a different schema. Producer has to call Flush() to
+ return false
}
if bc.numMessages == 0 {
@@ -196,6 +210,7 @@
bc.msgMetadata.ProducerName = &bc.producerName
bc.msgMetadata.ReplicateTo = replicateTo
bc.msgMetadata.PartitionKey = metadata.PartitionKey
+ bc.msgMetadata.SchemaVersion = schemaVersion
bc.msgMetadata.Properties = metadata.Properties
if deliverAt.UnixNano() > 0 {
@@ -217,6 +232,7 @@
bc.callbacks = []interface{}{}
bc.msgMetadata.ReplicateTo = nil
bc.msgMetadata.DeliverAtTime = nil
+ bc.msgMetadata.SchemaVersion = nil
bc.msgMetadata.Properties = nil
}
@@ -228,6 +244,7 @@
// No-Op for empty batch
return nil, 0, nil, nil
}
+
bc.log.Debug("BatchBuilder flush: messages: ", bc.numMessages)
bc.msgMetadata.NumMessagesInBatch = proto.Int32(int32(bc.numMessages))
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 7fd1885..1af837e 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -213,6 +213,10 @@
cmd.GetLastMessageId = msg.(*pb.CommandGetLastMessageId)
case pb.BaseCommand_AUTH_RESPONSE:
cmd.AuthResponse = msg.(*pb.CommandAuthResponse)
+ case pb.BaseCommand_GET_OR_CREATE_SCHEMA:
+ cmd.GetOrCreateSchema = msg.(*pb.CommandGetOrCreateSchema)
+ case pb.BaseCommand_GET_SCHEMA:
+ cmd.GetSchema = msg.(*pb.CommandGetSchema)
default:
panic(fmt.Sprintf("Missing command type: %v", cmdType))
}
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 318ec89..48dfd8f 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -542,6 +542,9 @@
case pb.BaseCommand_GET_SCHEMA_RESPONSE:
c.handleResponse(cmd.GetSchemaResponse.GetRequestId(), cmd)
+ case pb.BaseCommand_GET_OR_CREATE_SCHEMA_RESPONSE:
+ c.handleResponse(cmd.GetOrCreateSchemaResponse.GetRequestId(), cmd)
+
case pb.BaseCommand_ERROR:
c.handleResponseError(cmd.GetError())
diff --git a/pulsar/internal/key_based_batch_builder.go b/pulsar/internal/key_based_batch_builder.go
index 667e855..77fbb8c 100644
--- a/pulsar/internal/key_based_batch_builder.go
+++ b/pulsar/internal/key_based_batch_builder.go
@@ -131,6 +131,7 @@
metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64,
payload []byte,
callback interface{}, replicateTo []string, deliverAt time.Time,
+ schemaVersion []byte, multiSchemaEnabled bool,
) bool {
if replicateTo != nil && bc.numMessages != 0 {
// If the current batch is not empty and we're trying to set the replication clusters,
@@ -158,10 +159,14 @@
}
// add message to batch container
- batchPart.Add(
+ add := batchPart.Add(
metadata, sequenceIDGenerator, payload, callback, replicateTo,
deliverAt,
+ schemaVersion, multiSchemaEnabled,
)
+ if !add {
+ return false
+ }
addSingleMessageToBatch(bc.buffer, metadata, payload)
bc.numMessages++
diff --git a/pulsar/internal/lookup_service.go b/pulsar/internal/lookup_service.go
index e4dac1a..24d73db 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -61,6 +61,9 @@
// GetTopicsOfNamespace returns all the topics name for a given namespace.
GetTopicsOfNamespace(namespace string, mode GetTopicsOfNamespaceMode) ([]string, error)
+ // GetSchema returns schema for a given version.
+ GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error)
+
// Closable Allow Lookup Service's internal client to be able to closed
Closable
}
@@ -87,6 +90,23 @@
}
}
+func (ls *lookupService) GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error) {
+ id := ls.rpcClient.NewRequestID()
+ req := &pb.CommandGetSchema{
+ RequestId: proto.Uint64(id),
+ Topic: proto.String(topic),
+ SchemaVersion: schemaVersion,
+ }
+ res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_GET_SCHEMA, req)
+ if err != nil {
+ return nil, err
+ }
+ if res.Response.Error != nil {
+ return nil, errors.New(res.Response.GetError().String())
+ }
+ return res.Response.GetSchemaResponse.Schema, nil
+}
+
func (ls *lookupService) getBrokerAddress(lr *pb.CommandLookupTopicResponse) (logicalAddress *url.URL,
physicalAddress *url.URL, err error) {
if ls.tlsEnabled {
@@ -358,6 +378,9 @@
return topics, nil
}
+func (h *httpLookupService) GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error) {
+ return nil, errors.New("GetSchema is not supported by httpLookupService")
+}
func (h *httpLookupService) Close() {
h.httpClient.Close()
}
diff --git a/pulsar/internal/pulsartracing/message_carrier_util_test.go b/pulsar/internal/pulsartracing/message_carrier_util_test.go
index 7f25578..df78ae2 100644
--- a/pulsar/internal/pulsartracing/message_carrier_util_test.go
+++ b/pulsar/internal/pulsartracing/message_carrier_util_test.go
@@ -120,6 +120,9 @@
return ""
}
+func (msg *mockConsumerMessage) SchemaVersion() []byte {
+ return nil
+}
func (msg *mockConsumerMessage) GetEncryptionContext() *pulsar.EncryptionContext {
return &pulsar.EncryptionContext{}
}
diff --git a/pulsar/message.go b/pulsar/message.go
index b88f158..c117c99 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -65,6 +65,10 @@
// through a `SubscriptionType=Shared` subscription. With other subscription
// types, the messages will still be delivered immediately.
DeliverAt time.Time
+
+ //Schema assign to the current message
+ //Note: messages may have a different schema from producer schema, use it instead of producer schema when assigned
+ Schema Schema
}
// Message abstraction used in Pulsar
@@ -118,6 +122,9 @@
// GetSchemaValue returns the de-serialized value of the message, according to the configuration.
GetSchemaValue(v interface{}) error
+ //SchemaVersion get the schema version of the message, if any
+ SchemaVersion() []byte
+
// GetEncryptionContext returns the ecryption context of the message.
// It will be used by the application to parse the undecrypted message.
GetEncryptionContext() *EncryptionContext
diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go
index e47fb09..537f0da 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -233,6 +233,10 @@
return ""
}
+func (msg *mockMessage1) SchemaVersion() []byte {
+ return nil
+}
+
func (msg *mockMessage1) GetEncryptionContext() *EncryptionContext {
return &EncryptionContext{}
}
@@ -301,6 +305,10 @@
return nil
}
+func (msg *mockMessage2) SchemaVersion() []byte {
+ return nil
+}
+
func (msg *mockMessage2) ProducerName() string {
return ""
}
diff --git a/pulsar/producer.go b/pulsar/producer.go
index d9b2307..fd68631 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -166,6 +166,10 @@
// Default is 1 minute
PartitionsAutoDiscoveryInterval time.Duration
+ // Disable multiple Schame Version
+ // Default false
+ DisableMultiSchema bool
+
// Encryption specifies the fields required to encrypt a message
Encryption *ProducerEncryptionInfo
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 5a2694c..ec92415 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -19,6 +19,7 @@
import (
"context"
+ "errors"
"strings"
"sync"
"sync/atomic"
@@ -86,10 +87,36 @@
schemaInfo *SchemaInfo
partitionIdx int32
metrics *internal.LeveledMetrics
-
- epoch uint64
+ epoch uint64
+ schemaCache *schemaCache
}
+type schemaCache struct {
+ lock sync.RWMutex
+ schemas map[string][]byte
+}
+
+func newSchemaCache() *schemaCache {
+ return &schemaCache{
+ schemas: make(map[string][]byte),
+ }
+}
+
+func (s *schemaCache) Put(schema *SchemaInfo, schemaVersion []byte) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ key := schema.hash()
+ s.schemas[key] = schemaVersion
+}
+
+func (s *schemaCache) Get(schema *SchemaInfo) (schemaVersion []byte) {
+ s.lock.RLock()
+ defer s.lock.RUnlock()
+
+ key := schema.hash()
+ return s.schemas[key]
+}
func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int,
metrics *internal.LeveledMetrics) (
*partitionProducer, error) {
@@ -125,6 +152,7 @@
partitionIdx: int32(partitionIdx),
metrics: metrics,
epoch: 0,
+ schemaCache: newSchemaCache(),
}
if p.options.DisableBatching {
p.batchFlushTicker.Stop()
@@ -179,7 +207,7 @@
// set schema info for producer
- pbSchema := new(pb.Schema)
+ var pbSchema *pb.Schema
if p.schemaInfo != nil {
tmpSchemaType := pb.Schema_Type(int32(p.schemaInfo.Type))
pbSchema = &pb.Schema{
@@ -188,10 +216,9 @@
SchemaData: []byte(p.schemaInfo.Schema),
Properties: internal.ConvertFromStringMap(p.schemaInfo.Properties),
}
- p.log.Debugf("The partition consumer schema name is: %s", pbSchema.Name)
+ p.log.Debugf("The partition producer schema name is: %s", pbSchema.Name)
} else {
- pbSchema = nil
- p.log.Debug("The partition consumer schema is nil")
+ p.log.Debug("The partition producer schema is nil")
}
cmdProducer := &pb.CommandProducer{
@@ -261,6 +288,12 @@
nextSequenceID := uint64(res.Response.ProducerSuccess.GetLastSequenceId() + 1)
p.sequenceIDGenerator = &nextSequenceID
}
+
+ schemaVersion := res.Response.ProducerSuccess.GetSchemaVersion()
+ if len(schemaVersion) != 0 {
+ p.schemaCache.Put(p.schemaInfo, schemaVersion)
+ }
+
p._setConn(res.Cnx)
err = p._getConn().RegisterListener(p.producerID, p)
if err != nil {
@@ -316,6 +349,36 @@
p.connectClosedCh <- connectionClosed{}
}
+func (p *partitionProducer) getOrCreateSchema(schemaInfo *SchemaInfo) (schemaVersion []byte, err error) {
+
+ tmpSchemaType := pb.Schema_Type(int32(schemaInfo.Type))
+ pbSchema := &pb.Schema{
+ Name: proto.String(schemaInfo.Name),
+ Type: &tmpSchemaType,
+ SchemaData: []byte(schemaInfo.Schema),
+ Properties: internal.ConvertFromStringMap(schemaInfo.Properties),
+ }
+ id := p.client.rpcClient.NewRequestID()
+ req := &pb.CommandGetOrCreateSchema{
+ RequestId: proto.Uint64(id),
+ Topic: proto.String(p.topic),
+ Schema: pbSchema,
+ }
+ res, err := p.client.rpcClient.RequestOnCnx(p._getConn(), id, pb.BaseCommand_GET_OR_CREATE_SCHEMA, req)
+ if err != nil {
+ return
+ }
+ if res.Response.Error != nil {
+ err = errors.New(res.Response.GetError().String())
+ return
+ }
+ if res.Response.GetOrCreateSchemaResponse.ErrorCode != nil {
+ err = errors.New(*res.Response.GetOrCreateSchemaResponse.ErrorMessage)
+ return
+ }
+ return res.Response.GetOrCreateSchemaResponse.SchemaVersion, nil
+}
+
func (p *partitionProducer) reconnectToBroker() {
var (
maxRetry int
@@ -407,21 +470,54 @@
// read payload from message
payload := msg.Payload
+ var schemaPayload []byte
var err error
+ if msg.Value != nil && msg.Payload != nil {
+ p.log.Error("Can not set Value and Payload both")
+ return
+ }
- // payload and schema are mutually exclusive
- // try to get payload from schema value only if payload is not set
- if payload == nil && p.options.Schema != nil {
- var schemaPayload []byte
- schemaPayload, err = p.options.Schema.Encode(msg.Value)
- if err != nil {
- p.publishSemaphore.Release()
- request.callback(nil, request.msg, newError(SchemaFailure, err.Error()))
- p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value)
+ if p.options.DisableMultiSchema {
+ if msg.Schema != nil && p.options.Schema != nil &&
+ msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() {
+ p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
return
}
+ }
+ var schema Schema
+ var schemaVersion []byte
+ if msg.Schema != nil {
+ schema = msg.Schema
+ } else if p.options.Schema != nil {
+ schema = p.options.Schema
+ }
+ if msg.Value != nil {
+ // payload and schema are mutually exclusive
+ // try to get payload from schema value only if payload is not set
+ if payload == nil && schema != nil {
+ schemaPayload, err = schema.Encode(msg.Value)
+ if err != nil {
+ p.publishSemaphore.Release()
+ request.callback(nil, request.msg, newError(SchemaFailure, err.Error()))
+ p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value)
+ return
+ }
+ }
+ }
+ if payload == nil {
payload = schemaPayload
}
+ if schema != nil {
+ schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo())
+ if schemaVersion == nil {
+ schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
+ if err != nil {
+ p.log.WithError(err).Error("get schema version fail")
+ return
+ }
+ p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
+ }
+ }
// if msg is too large
if len(payload) > int(p._getConn().GetMaxMessageSize()) {
@@ -476,9 +572,9 @@
if msg.DisableReplication {
msg.ReplicationClusters = []string{"__local__"}
}
-
+ multiSchemaEnabled := !p.options.DisableMultiSchema
added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, request,
- msg.ReplicationClusters, deliverAt)
+ msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled)
if !added {
// The current batch is full.. flush it and retry
@@ -486,7 +582,7 @@
// after flushing try again to add the current payload
if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, request,
- msg.ReplicationClusters, deliverAt); !ok {
+ msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled); !ok {
p.publishSemaphore.Release()
request.callback(nil, request.msg, errFailAddToBatch)
p.log.WithField("size", len(payload)).
@@ -501,6 +597,7 @@
p.internalFlushCurrentBatch()
}
+
}
type pendingItem struct {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 6b2b5d9..dc13f50 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1380,3 +1380,191 @@
assert.NotNil(t, err)
assert.Nil(t, producer3)
}
+
+func TestMultipleSchemaOfKeyBasedBatchProducerConsumer(t *testing.T) {
+ const MsgBatchCount = 10
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ schema1 := NewAvroSchema(`{"fields":
+ [
+ {"name":"id","type":"int"},
+ {"default":null,"name":"name","type":["null","string"]}
+ ],
+ "name":"MyAvro3","namespace":"PulsarTestCase","type":"record"}`, nil)
+ schema2 := NewAvroSchema(`{"fields":
+ [
+ {"name":"id","type":"int"},{"default":null,"name":"name","type":["null","string"]},
+ {"default":null,"name":"age","type":["null","int"]}
+ ],
+ "name":"MyAvro3","namespace":"PulsarTestCase","type":"record"}`, nil)
+ v1 := map[string]interface{}{
+ "id": 1,
+ "name": map[string]interface{}{
+ "string": "aac",
+ },
+ }
+ v2 := map[string]interface{}{
+ "id": 1,
+ "name": map[string]interface{}{
+ "string": "test",
+ },
+ "age": map[string]interface{}{
+ "int": 10,
+ },
+ }
+ topic := newTopicName()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Schema: schema1,
+ BatcherBuilderType: KeyBasedBatchBuilder,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+ defer producer.Close()
+
+ keys := []string{"key1", "key2", "key3"}
+
+ for i := 0; i < MsgBatchCount; i++ {
+ var messageContent []byte
+ var schema Schema
+ for _, key := range keys {
+ if i%2 == 0 {
+ messageContent, err = schema1.Encode(v1)
+ schema = schema1
+ assert.NoError(t, err)
+ } else {
+ messageContent, err = schema2.Encode(v2)
+ schema = schema2
+ assert.NoError(t, err)
+ }
+ producer.SendAsync(context.Background(), &ProducerMessage{
+ Payload: messageContent,
+ Key: key,
+ Schema: schema,
+ }, func(id MessageID, producerMessage *ProducerMessage, err error) {
+ assert.NoError(t, err)
+ assert.NotNil(t, id)
+ })
+ }
+
+ }
+ producer.Flush()
+
+ //// create consumer
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub2",
+ Type: Failover,
+ Schema: schema1,
+ SubscriptionInitialPosition: SubscriptionPositionEarliest,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ for i := 0; i < MsgBatchCount*len(keys); i++ {
+ msg, err := consumer.Receive(context.Background())
+ if err != nil {
+ t.Fatal(err)
+ }
+ var v interface{}
+ err = msg.GetSchemaValue(&v)
+ t.Logf(`schemaVersion: %x recevice %s:%v`, msg.SchemaVersion(), msg.Key(), v)
+ assert.Nil(t, err)
+ }
+}
+
+func TestMultipleSchemaProducerConsumer(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ schema1 := NewAvroSchema(`{"fields":
+ [
+ {"name":"id","type":"int"},{"default":null,"name":"name","type":["null","string"]}
+ ],
+ "name":"MyAvro3","namespace":"PulsarTestCase","type":"record"}`, nil)
+ schema2 := NewAvroSchema(`{"fields":
+ [
+ {"name":"id","type":"int"},{"default":null,"name":"name","type":["null","string"]},
+ {"default":null,"name":"age","type":["null","int"]}
+ ],"name":"MyAvro3","namespace":"PulsarTestCase","type":"record"}`, nil)
+ v1 := map[string]interface{}{
+ "id": 1,
+ "name": map[string]interface{}{
+ "string": "aac",
+ },
+ }
+ v2 := map[string]interface{}{
+ "id": 1,
+ "name": map[string]interface{}{
+ "string": "test",
+ },
+ "age": map[string]interface{}{
+ "int": 10,
+ },
+ }
+ topic := newTopicName()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Schema: schema1,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+ defer producer.Close()
+
+ for i := 0; i < 10; i++ {
+ var messageContent []byte
+ var key string
+ var schema Schema
+ if i%2 == 0 {
+ messageContent, err = schema1.Encode(v1)
+ key = "v1"
+ schema = schema1
+ assert.NoError(t, err)
+ } else {
+ messageContent, err = schema2.Encode(v2)
+ key = "v2"
+ schema = schema2
+ assert.NoError(t, err)
+ }
+ producer.SendAsync(context.Background(), &ProducerMessage{
+ Payload: messageContent,
+ Key: key,
+ Schema: schema,
+ }, func(id MessageID, producerMessage *ProducerMessage, err error) {
+ assert.NoError(t, err)
+ assert.NotNil(t, id)
+ })
+ }
+ producer.Flush()
+
+ //// create consumer
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub2",
+ Type: Failover,
+ Schema: schema1,
+ SubscriptionInitialPosition: SubscriptionPositionEarliest,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ for i := 0; i < 10; i++ {
+ msg, err := consumer.Receive(context.Background())
+ if err != nil {
+ t.Fatal(err)
+ }
+ var v interface{}
+ err = msg.GetSchemaValue(&v)
+ t.Logf(`schemaVersion: %x recevice %s:%v`, msg.SchemaVersion(), msg.Key(), v)
+ assert.Nil(t, err)
+ }
+}
diff --git a/pulsar/schema.go b/pulsar/schema.go
index 42499ad..cd6656f 100644
--- a/pulsar/schema.go
+++ b/pulsar/schema.go
@@ -19,7 +19,10 @@
import (
"bytes"
+ "crypto/sha256"
+ "encoding/hex"
"encoding/json"
+ "fmt"
"reflect"
"unsafe"
@@ -62,6 +65,12 @@
Properties map[string]string
}
+func (s SchemaInfo) hash() string {
+ h := sha256.New()
+ h.Write([]byte(s.Schema))
+ return hex.EncodeToString(h.Sum(nil))
+}
+
type Schema interface {
Encode(v interface{}) ([]byte, error)
Decode(data []byte, v interface{}) error
@@ -69,6 +78,37 @@
GetSchemaInfo() *SchemaInfo
}
+func NewSchema(schemaType SchemaType, schemaData []byte, properties map[string]string) (schema Schema, err error) {
+ var schemaDef = string(schemaData)
+ var s Schema
+ switch schemaType {
+ case STRING:
+ s = NewStringSchema(properties)
+ case JSON:
+ s = NewJSONSchema(schemaDef, properties)
+ case PROTOBUF:
+ s = NewProtoSchema(schemaDef, properties)
+ case AVRO:
+ s = NewAvroSchema(schemaDef, properties)
+ case INT8:
+ s = NewInt8Schema(properties)
+ case INT16:
+ s = NewInt16Schema(properties)
+ case INT32:
+ s = NewInt32Schema(properties)
+ case INT64:
+ s = NewInt64Schema(properties)
+ case FLOAT:
+ s = NewFloatSchema(properties)
+ case DOUBLE:
+ s = NewDoubleSchema(properties)
+ default:
+ err = fmt.Errorf("not support schema type of %v", schemaType)
+ }
+ schema = s
+ return
+}
+
type AvroCodec struct {
Codec *goavro.Codec
}