feat: support multiple schema version for producer and consumer (#611)

* feat: support multiple schema version for producer and consumer

* Fix format the code and add some description for error

* fix: syntax error

* fix lint and CI error
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 4cc1645..fac9d4b 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -18,6 +18,7 @@
 package pulsar
 
 import (
+	"encoding/hex"
 	"errors"
 	"fmt"
 	"math"
@@ -144,11 +145,57 @@
 	nackTracker *negativeAcksTracker
 	dlq         *dlqRouter
 
-	log log.Logger
-
+	log                  log.Logger
 	compressionProviders sync.Map //map[pb.CompressionType]compression.Provider
 	metrics              *internal.LeveledMetrics
 	decryptor            cryptointernal.Decryptor
+	schemaInfoCache      *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+	lock   sync.RWMutex
+	cache  map[string]Schema
+	client *client
+	topic  string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+	return &schemaInfoCache{
+		cache:  make(map[string]Schema),
+		client: client,
+		topic:  topic,
+	}
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) {
+	key := hex.EncodeToString(schemaVersion)
+	s.lock.RLock()
+	schema, ok := s.cache[key]
+	s.lock.RUnlock()
+	if ok {
+		return schema, nil
+	}
+
+	pbSchema, err := s.client.lookupService.GetSchema(s.topic, schemaVersion)
+	if err != nil {
+		return nil, err
+	}
+
+	var properties = internal.ConvertToStringMap(pbSchema.Properties)
+
+	schema, err = NewSchema(SchemaType(*pbSchema.Type), pbSchema.SchemaData, properties)
+	if err != nil {
+		return nil, err
+	}
+	s.add(key, schema)
+	return schema, nil
+}
+
+func (s *schemaInfoCache) add(schemaVersionHash string, schema Schema) {
+	s.lock.Lock()
+	defer s.lock.Unlock()
+
+	s.cache[schemaVersionHash] = schema
 }
 
 func newPartitionConsumer(parent Consumer, client *client, options *partitionConsumerOpts,
@@ -175,6 +222,7 @@
 		compressionProviders: sync.Map{},
 		dlq:                  dlq,
 		metrics:              metrics,
+		schemaInfoCache:      newSchemaInfoCache(client, options.topic),
 	}
 	pc.setConsumerState(consumerInit)
 	pc.log = client.log.SubLogger(log.Fields{
@@ -687,6 +735,8 @@
 				replicationClusters: msgMeta.GetReplicateTo(),
 				replicatedFrom:      msgMeta.GetReplicatedFrom(),
 				redeliveryCount:     response.GetRedeliveryCount(),
+				schemaVersion:       msgMeta.GetSchemaVersion(),
+				schemaInfoCache:     pc.schemaInfoCache,
 				orderingKey:         string(smm.OrderingKey),
 				index:               messageIndex,
 				brokerPublishTime:   brokerPublishTime,
@@ -705,6 +755,8 @@
 				replicationClusters: msgMeta.GetReplicateTo(),
 				replicatedFrom:      msgMeta.GetReplicatedFrom(),
 				redeliveryCount:     response.GetRedeliveryCount(),
+				schemaVersion:       msgMeta.GetSchemaVersion(),
+				schemaInfoCache:     pc.schemaInfoCache,
 				index:               messageIndex,
 				brokerPublishTime:   brokerPublishTime,
 			}
@@ -1119,7 +1171,7 @@
 	keySharedMeta := toProtoKeySharedMeta(pc.options.keySharedPolicy)
 	requestID := pc.client.rpcClient.NewRequestID()
 
-	pbSchema := new(pb.Schema)
+	var pbSchema *pb.Schema
 
 	if pc.options.schema != nil && pc.options.schema.GetSchemaInfo() != nil {
 		tmpSchemaType := pb.Schema_Type(int32(pc.options.schema.GetSchemaInfo().Type))
@@ -1131,7 +1183,6 @@
 		}
 		pc.log.Debugf("The partition consumer schema name is: %s", pbSchema.Name)
 	} else {
-		pbSchema = nil
 		pc.log.Debug("The partition consumer schema is nil")
 	}
 
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 8248b1a..067439f 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -243,6 +243,8 @@
 	replicatedFrom      string
 	redeliveryCount     uint32
 	schema              Schema
+	schemaVersion       []byte
+	schemaInfoCache     *schemaInfoCache
 	encryptionContext   *EncryptionContext
 	index               *uint64
 	brokerPublishTime   *time.Time
@@ -293,9 +295,20 @@
 }
 
 func (msg *message) GetSchemaValue(v interface{}) error {
+	if msg.schemaVersion != nil {
+		schema, err := msg.schemaInfoCache.Get(msg.schemaVersion)
+		if err != nil {
+			return err
+		}
+		return schema.Decode(msg.payLoad, v)
+	}
 	return msg.schema.Decode(msg.payLoad, v)
 }
 
+func (msg *message) SchemaVersion() []byte {
+	return msg.schemaVersion
+}
+
 func (msg *message) ProducerName() string {
 	return msg.producerName
 }
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index fb7598e..fe8f628 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -18,6 +18,7 @@
 package internal
 
 import (
+	"bytes"
 	"time"
 
 	"github.com/gogo/protobuf/proto"
@@ -49,6 +50,7 @@
 		metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64,
 		payload []byte,
 		callback interface{}, replicateTo []string, deliverAt time.Time,
+		schemaVersion []byte, multiSchemaEnabled bool,
 	) bool
 
 	// Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
@@ -165,12 +167,21 @@
 	return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
 }
 
+func (bc *batchContainer) hasSameSchema(schemaVersion []byte) bool {
+	if bc.numMessages == 0 {
+		return true
+	}
+	return bytes.Equal(bc.msgMetadata.SchemaVersion, schemaVersion)
+}
+
 // Add will add single message to batch.
 func (bc *batchContainer) Add(
 	metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64,
 	payload []byte,
 	callback interface{}, replicateTo []string, deliverAt time.Time,
+	schemaVersion []byte, multiSchemaEnabled bool,
 ) bool {
+
 	if replicateTo != nil && bc.numMessages != 0 {
 		// If the current batch is not empty and we're trying to set the replication clusters,
 		// then we need to force the current batch to flush and send the message individually
@@ -182,6 +193,9 @@
 	} else if !bc.hasSpace(payload) {
 		// The current batch is full. Producer has to call Flush() to
 		return false
+	} else if multiSchemaEnabled && !bc.hasSameSchema(schemaVersion) {
+		// The current batch has a different schema. Producer has to call Flush() to
+		return false
 	}
 
 	if bc.numMessages == 0 {
@@ -196,6 +210,7 @@
 		bc.msgMetadata.ProducerName = &bc.producerName
 		bc.msgMetadata.ReplicateTo = replicateTo
 		bc.msgMetadata.PartitionKey = metadata.PartitionKey
+		bc.msgMetadata.SchemaVersion = schemaVersion
 		bc.msgMetadata.Properties = metadata.Properties
 
 		if deliverAt.UnixNano() > 0 {
@@ -217,6 +232,7 @@
 	bc.callbacks = []interface{}{}
 	bc.msgMetadata.ReplicateTo = nil
 	bc.msgMetadata.DeliverAtTime = nil
+	bc.msgMetadata.SchemaVersion = nil
 	bc.msgMetadata.Properties = nil
 }
 
@@ -228,6 +244,7 @@
 		// No-Op for empty batch
 		return nil, 0, nil, nil
 	}
+
 	bc.log.Debug("BatchBuilder flush: messages: ", bc.numMessages)
 
 	bc.msgMetadata.NumMessagesInBatch = proto.Int32(int32(bc.numMessages))
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 7fd1885..1af837e 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -213,6 +213,10 @@
 		cmd.GetLastMessageId = msg.(*pb.CommandGetLastMessageId)
 	case pb.BaseCommand_AUTH_RESPONSE:
 		cmd.AuthResponse = msg.(*pb.CommandAuthResponse)
+	case pb.BaseCommand_GET_OR_CREATE_SCHEMA:
+		cmd.GetOrCreateSchema = msg.(*pb.CommandGetOrCreateSchema)
+	case pb.BaseCommand_GET_SCHEMA:
+		cmd.GetSchema = msg.(*pb.CommandGetSchema)
 	default:
 		panic(fmt.Sprintf("Missing command type: %v", cmdType))
 	}
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 318ec89..48dfd8f 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -542,6 +542,9 @@
 	case pb.BaseCommand_GET_SCHEMA_RESPONSE:
 		c.handleResponse(cmd.GetSchemaResponse.GetRequestId(), cmd)
 
+	case pb.BaseCommand_GET_OR_CREATE_SCHEMA_RESPONSE:
+		c.handleResponse(cmd.GetOrCreateSchemaResponse.GetRequestId(), cmd)
+
 	case pb.BaseCommand_ERROR:
 		c.handleResponseError(cmd.GetError())
 
diff --git a/pulsar/internal/key_based_batch_builder.go b/pulsar/internal/key_based_batch_builder.go
index 667e855..77fbb8c 100644
--- a/pulsar/internal/key_based_batch_builder.go
+++ b/pulsar/internal/key_based_batch_builder.go
@@ -131,6 +131,7 @@
 	metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64,
 	payload []byte,
 	callback interface{}, replicateTo []string, deliverAt time.Time,
+	schemaVersion []byte, multiSchemaEnabled bool,
 ) bool {
 	if replicateTo != nil && bc.numMessages != 0 {
 		// If the current batch is not empty and we're trying to set the replication clusters,
@@ -158,10 +159,14 @@
 	}
 
 	// add message to batch container
-	batchPart.Add(
+	add := batchPart.Add(
 		metadata, sequenceIDGenerator, payload, callback, replicateTo,
 		deliverAt,
+		schemaVersion, multiSchemaEnabled,
 	)
+	if !add {
+		return false
+	}
 	addSingleMessageToBatch(bc.buffer, metadata, payload)
 
 	bc.numMessages++
diff --git a/pulsar/internal/lookup_service.go b/pulsar/internal/lookup_service.go
index e4dac1a..24d73db 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -61,6 +61,9 @@
 	// GetTopicsOfNamespace returns all the topics name for a given namespace.
 	GetTopicsOfNamespace(namespace string, mode GetTopicsOfNamespaceMode) ([]string, error)
 
+	// GetSchema returns schema for a given version.
+	GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error)
+
 	// Closable Allow Lookup Service's internal client to be able to closed
 	Closable
 }
@@ -87,6 +90,23 @@
 	}
 }
 
+func (ls *lookupService) GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error) {
+	id := ls.rpcClient.NewRequestID()
+	req := &pb.CommandGetSchema{
+		RequestId:     proto.Uint64(id),
+		Topic:         proto.String(topic),
+		SchemaVersion: schemaVersion,
+	}
+	res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_GET_SCHEMA, req)
+	if err != nil {
+		return nil, err
+	}
+	if res.Response.Error != nil {
+		return nil, errors.New(res.Response.GetError().String())
+	}
+	return res.Response.GetSchemaResponse.Schema, nil
+}
+
 func (ls *lookupService) getBrokerAddress(lr *pb.CommandLookupTopicResponse) (logicalAddress *url.URL,
 	physicalAddress *url.URL, err error) {
 	if ls.tlsEnabled {
@@ -358,6 +378,9 @@
 	return topics, nil
 }
 
+func (h *httpLookupService) GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error) {
+	return nil, errors.New("GetSchema is not supported by httpLookupService")
+}
 func (h *httpLookupService) Close() {
 	h.httpClient.Close()
 }
diff --git a/pulsar/internal/pulsartracing/message_carrier_util_test.go b/pulsar/internal/pulsartracing/message_carrier_util_test.go
index 7f25578..df78ae2 100644
--- a/pulsar/internal/pulsartracing/message_carrier_util_test.go
+++ b/pulsar/internal/pulsartracing/message_carrier_util_test.go
@@ -120,6 +120,9 @@
 	return ""
 }
 
+func (msg *mockConsumerMessage) SchemaVersion() []byte {
+	return nil
+}
 func (msg *mockConsumerMessage) GetEncryptionContext() *pulsar.EncryptionContext {
 	return &pulsar.EncryptionContext{}
 }
diff --git a/pulsar/message.go b/pulsar/message.go
index b88f158..c117c99 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -65,6 +65,10 @@
 	//     through a `SubscriptionType=Shared` subscription. With other subscription
 	//     types, the messages will still be delivered immediately.
 	DeliverAt time.Time
+
+	//Schema assign to the current message
+	//Note: messages may have a different schema from producer schema, use it instead of producer schema when assigned
+	Schema Schema
 }
 
 // Message abstraction used in Pulsar
@@ -118,6 +122,9 @@
 	// GetSchemaValue returns the de-serialized value of the message, according to the configuration.
 	GetSchemaValue(v interface{}) error
 
+	//SchemaVersion get the schema version of the message, if any
+	SchemaVersion() []byte
+
 	// GetEncryptionContext returns the ecryption context of the message.
 	// It will be used by the application to parse the undecrypted message.
 	GetEncryptionContext() *EncryptionContext
diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go
index e47fb09..537f0da 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -233,6 +233,10 @@
 	return ""
 }
 
+func (msg *mockMessage1) SchemaVersion() []byte {
+	return nil
+}
+
 func (msg *mockMessage1) GetEncryptionContext() *EncryptionContext {
 	return &EncryptionContext{}
 }
@@ -301,6 +305,10 @@
 	return nil
 }
 
+func (msg *mockMessage2) SchemaVersion() []byte {
+	return nil
+}
+
 func (msg *mockMessage2) ProducerName() string {
 	return ""
 }
diff --git a/pulsar/producer.go b/pulsar/producer.go
index d9b2307..fd68631 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -166,6 +166,10 @@
 	// Default is 1 minute
 	PartitionsAutoDiscoveryInterval time.Duration
 
+	// Disable multiple Schame Version
+	// Default false
+	DisableMultiSchema bool
+
 	// Encryption specifies the fields required to encrypt a message
 	Encryption *ProducerEncryptionInfo
 }
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 5a2694c..ec92415 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -19,6 +19,7 @@
 
 import (
 	"context"
+	"errors"
 	"strings"
 	"sync"
 	"sync/atomic"
@@ -86,10 +87,36 @@
 	schemaInfo       *SchemaInfo
 	partitionIdx     int32
 	metrics          *internal.LeveledMetrics
-
-	epoch uint64
+	epoch            uint64
+	schemaCache      *schemaCache
 }
 
+type schemaCache struct {
+	lock    sync.RWMutex
+	schemas map[string][]byte
+}
+
+func newSchemaCache() *schemaCache {
+	return &schemaCache{
+		schemas: make(map[string][]byte),
+	}
+}
+
+func (s *schemaCache) Put(schema *SchemaInfo, schemaVersion []byte) {
+	s.lock.Lock()
+	defer s.lock.Unlock()
+
+	key := schema.hash()
+	s.schemas[key] = schemaVersion
+}
+
+func (s *schemaCache) Get(schema *SchemaInfo) (schemaVersion []byte) {
+	s.lock.RLock()
+	defer s.lock.RUnlock()
+
+	key := schema.hash()
+	return s.schemas[key]
+}
 func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int,
 	metrics *internal.LeveledMetrics) (
 	*partitionProducer, error) {
@@ -125,6 +152,7 @@
 		partitionIdx:     int32(partitionIdx),
 		metrics:          metrics,
 		epoch:            0,
+		schemaCache:      newSchemaCache(),
 	}
 	if p.options.DisableBatching {
 		p.batchFlushTicker.Stop()
@@ -179,7 +207,7 @@
 
 	// set schema info for producer
 
-	pbSchema := new(pb.Schema)
+	var pbSchema *pb.Schema
 	if p.schemaInfo != nil {
 		tmpSchemaType := pb.Schema_Type(int32(p.schemaInfo.Type))
 		pbSchema = &pb.Schema{
@@ -188,10 +216,9 @@
 			SchemaData: []byte(p.schemaInfo.Schema),
 			Properties: internal.ConvertFromStringMap(p.schemaInfo.Properties),
 		}
-		p.log.Debugf("The partition consumer schema name is: %s", pbSchema.Name)
+		p.log.Debugf("The partition producer schema name is: %s", pbSchema.Name)
 	} else {
-		pbSchema = nil
-		p.log.Debug("The partition consumer schema is nil")
+		p.log.Debug("The partition producer schema is nil")
 	}
 
 	cmdProducer := &pb.CommandProducer{
@@ -261,6 +288,12 @@
 		nextSequenceID := uint64(res.Response.ProducerSuccess.GetLastSequenceId() + 1)
 		p.sequenceIDGenerator = &nextSequenceID
 	}
+
+	schemaVersion := res.Response.ProducerSuccess.GetSchemaVersion()
+	if len(schemaVersion) != 0 {
+		p.schemaCache.Put(p.schemaInfo, schemaVersion)
+	}
+
 	p._setConn(res.Cnx)
 	err = p._getConn().RegisterListener(p.producerID, p)
 	if err != nil {
@@ -316,6 +349,36 @@
 	p.connectClosedCh <- connectionClosed{}
 }
 
+func (p *partitionProducer) getOrCreateSchema(schemaInfo *SchemaInfo) (schemaVersion []byte, err error) {
+
+	tmpSchemaType := pb.Schema_Type(int32(schemaInfo.Type))
+	pbSchema := &pb.Schema{
+		Name:       proto.String(schemaInfo.Name),
+		Type:       &tmpSchemaType,
+		SchemaData: []byte(schemaInfo.Schema),
+		Properties: internal.ConvertFromStringMap(schemaInfo.Properties),
+	}
+	id := p.client.rpcClient.NewRequestID()
+	req := &pb.CommandGetOrCreateSchema{
+		RequestId: proto.Uint64(id),
+		Topic:     proto.String(p.topic),
+		Schema:    pbSchema,
+	}
+	res, err := p.client.rpcClient.RequestOnCnx(p._getConn(), id, pb.BaseCommand_GET_OR_CREATE_SCHEMA, req)
+	if err != nil {
+		return
+	}
+	if res.Response.Error != nil {
+		err = errors.New(res.Response.GetError().String())
+		return
+	}
+	if res.Response.GetOrCreateSchemaResponse.ErrorCode != nil {
+		err = errors.New(*res.Response.GetOrCreateSchemaResponse.ErrorMessage)
+		return
+	}
+	return res.Response.GetOrCreateSchemaResponse.SchemaVersion, nil
+}
+
 func (p *partitionProducer) reconnectToBroker() {
 	var (
 		maxRetry int
@@ -407,21 +470,54 @@
 	// read payload from message
 	payload := msg.Payload
 
+	var schemaPayload []byte
 	var err error
+	if msg.Value != nil && msg.Payload != nil {
+		p.log.Error("Can not set Value and Payload both")
+		return
+	}
 
-	// payload and schema are mutually exclusive
-	// try to get payload from schema value only if payload is not set
-	if payload == nil && p.options.Schema != nil {
-		var schemaPayload []byte
-		schemaPayload, err = p.options.Schema.Encode(msg.Value)
-		if err != nil {
-			p.publishSemaphore.Release()
-			request.callback(nil, request.msg, newError(SchemaFailure, err.Error()))
-			p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value)
+	if p.options.DisableMultiSchema {
+		if msg.Schema != nil && p.options.Schema != nil &&
+			msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() {
+			p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
 			return
 		}
+	}
+	var schema Schema
+	var schemaVersion []byte
+	if msg.Schema != nil {
+		schema = msg.Schema
+	} else if p.options.Schema != nil {
+		schema = p.options.Schema
+	}
+	if msg.Value != nil {
+		// payload and schema are mutually exclusive
+		// try to get payload from schema value only if payload is not set
+		if payload == nil && schema != nil {
+			schemaPayload, err = schema.Encode(msg.Value)
+			if err != nil {
+				p.publishSemaphore.Release()
+				request.callback(nil, request.msg, newError(SchemaFailure, err.Error()))
+				p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value)
+				return
+			}
+		}
+	}
+	if payload == nil {
 		payload = schemaPayload
 	}
+	if schema != nil {
+		schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo())
+		if schemaVersion == nil {
+			schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
+			if err != nil {
+				p.log.WithError(err).Error("get schema version fail")
+				return
+			}
+			p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
+		}
+	}
 
 	// if msg is too large
 	if len(payload) > int(p._getConn().GetMaxMessageSize()) {
@@ -476,9 +572,9 @@
 	if msg.DisableReplication {
 		msg.ReplicationClusters = []string{"__local__"}
 	}
-
+	multiSchemaEnabled := !p.options.DisableMultiSchema
 	added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, request,
-		msg.ReplicationClusters, deliverAt)
+		msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled)
 	if !added {
 		// The current batch is full.. flush it and retry
 
@@ -486,7 +582,7 @@
 
 		// after flushing try again to add the current payload
 		if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, request,
-			msg.ReplicationClusters, deliverAt); !ok {
+			msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled); !ok {
 			p.publishSemaphore.Release()
 			request.callback(nil, request.msg, errFailAddToBatch)
 			p.log.WithField("size", len(payload)).
@@ -501,6 +597,7 @@
 		p.internalFlushCurrentBatch()
 
 	}
+
 }
 
 type pendingItem struct {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 6b2b5d9..dc13f50 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1380,3 +1380,191 @@
 	assert.NotNil(t, err)
 	assert.Nil(t, producer3)
 }
+
+func TestMultipleSchemaOfKeyBasedBatchProducerConsumer(t *testing.T) {
+	const MsgBatchCount = 10
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.NoError(t, err)
+	defer client.Close()
+
+	schema1 := NewAvroSchema(`{"fields":
+		[	
+			{"name":"id","type":"int"},
+			{"default":null,"name":"name","type":["null","string"]}
+		],
+		"name":"MyAvro3","namespace":"PulsarTestCase","type":"record"}`, nil)
+	schema2 := NewAvroSchema(`{"fields":
+		[
+				{"name":"id","type":"int"},{"default":null,"name":"name","type":["null","string"]},
+			   {"default":null,"name":"age","type":["null","int"]}
+		],
+		"name":"MyAvro3","namespace":"PulsarTestCase","type":"record"}`, nil)
+	v1 := map[string]interface{}{
+		"id": 1,
+		"name": map[string]interface{}{
+			"string": "aac",
+		},
+	}
+	v2 := map[string]interface{}{
+		"id": 1,
+		"name": map[string]interface{}{
+			"string": "test",
+		},
+		"age": map[string]interface{}{
+			"int": 10,
+		},
+	}
+	topic := newTopicName()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:              topic,
+		Schema:             schema1,
+		BatcherBuilderType: KeyBasedBatchBuilder,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	keys := []string{"key1", "key2", "key3"}
+
+	for i := 0; i < MsgBatchCount; i++ {
+		var messageContent []byte
+		var schema Schema
+		for _, key := range keys {
+			if i%2 == 0 {
+				messageContent, err = schema1.Encode(v1)
+				schema = schema1
+				assert.NoError(t, err)
+			} else {
+				messageContent, err = schema2.Encode(v2)
+				schema = schema2
+				assert.NoError(t, err)
+			}
+			producer.SendAsync(context.Background(), &ProducerMessage{
+				Payload: messageContent,
+				Key:     key,
+				Schema:  schema,
+			}, func(id MessageID, producerMessage *ProducerMessage, err error) {
+				assert.NoError(t, err)
+				assert.NotNil(t, id)
+			})
+		}
+
+	}
+	producer.Flush()
+
+	//// create consumer
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                       topic,
+		SubscriptionName:            "my-sub2",
+		Type:                        Failover,
+		Schema:                      schema1,
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	for i := 0; i < MsgBatchCount*len(keys); i++ {
+		msg, err := consumer.Receive(context.Background())
+		if err != nil {
+			t.Fatal(err)
+		}
+		var v interface{}
+		err = msg.GetSchemaValue(&v)
+		t.Logf(`schemaVersion: %x recevice %s:%v`, msg.SchemaVersion(), msg.Key(), v)
+		assert.Nil(t, err)
+	}
+}
+
+func TestMultipleSchemaProducerConsumer(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.NoError(t, err)
+	defer client.Close()
+
+	schema1 := NewAvroSchema(`{"fields":
+		[
+			{"name":"id","type":"int"},{"default":null,"name":"name","type":["null","string"]}
+		],
+		"name":"MyAvro3","namespace":"PulsarTestCase","type":"record"}`, nil)
+	schema2 := NewAvroSchema(`{"fields":
+		[
+			{"name":"id","type":"int"},{"default":null,"name":"name","type":["null","string"]},
+			{"default":null,"name":"age","type":["null","int"]}
+		],"name":"MyAvro3","namespace":"PulsarTestCase","type":"record"}`, nil)
+	v1 := map[string]interface{}{
+		"id": 1,
+		"name": map[string]interface{}{
+			"string": "aac",
+		},
+	}
+	v2 := map[string]interface{}{
+		"id": 1,
+		"name": map[string]interface{}{
+			"string": "test",
+		},
+		"age": map[string]interface{}{
+			"int": 10,
+		},
+	}
+	topic := newTopicName()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:  topic,
+		Schema: schema1,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	for i := 0; i < 10; i++ {
+		var messageContent []byte
+		var key string
+		var schema Schema
+		if i%2 == 0 {
+			messageContent, err = schema1.Encode(v1)
+			key = "v1"
+			schema = schema1
+			assert.NoError(t, err)
+		} else {
+			messageContent, err = schema2.Encode(v2)
+			key = "v2"
+			schema = schema2
+			assert.NoError(t, err)
+		}
+		producer.SendAsync(context.Background(), &ProducerMessage{
+			Payload: messageContent,
+			Key:     key,
+			Schema:  schema,
+		}, func(id MessageID, producerMessage *ProducerMessage, err error) {
+			assert.NoError(t, err)
+			assert.NotNil(t, id)
+		})
+	}
+	producer.Flush()
+
+	//// create consumer
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                       topic,
+		SubscriptionName:            "my-sub2",
+		Type:                        Failover,
+		Schema:                      schema1,
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	for i := 0; i < 10; i++ {
+		msg, err := consumer.Receive(context.Background())
+		if err != nil {
+			t.Fatal(err)
+		}
+		var v interface{}
+		err = msg.GetSchemaValue(&v)
+		t.Logf(`schemaVersion: %x recevice %s:%v`, msg.SchemaVersion(), msg.Key(), v)
+		assert.Nil(t, err)
+	}
+}
diff --git a/pulsar/schema.go b/pulsar/schema.go
index 42499ad..cd6656f 100644
--- a/pulsar/schema.go
+++ b/pulsar/schema.go
@@ -19,7 +19,10 @@
 
 import (
 	"bytes"
+	"crypto/sha256"
+	"encoding/hex"
 	"encoding/json"
+	"fmt"
 	"reflect"
 	"unsafe"
 
@@ -62,6 +65,12 @@
 	Properties map[string]string
 }
 
+func (s SchemaInfo) hash() string {
+	h := sha256.New()
+	h.Write([]byte(s.Schema))
+	return hex.EncodeToString(h.Sum(nil))
+}
+
 type Schema interface {
 	Encode(v interface{}) ([]byte, error)
 	Decode(data []byte, v interface{}) error
@@ -69,6 +78,37 @@
 	GetSchemaInfo() *SchemaInfo
 }
 
+func NewSchema(schemaType SchemaType, schemaData []byte, properties map[string]string) (schema Schema, err error) {
+	var schemaDef = string(schemaData)
+	var s Schema
+	switch schemaType {
+	case STRING:
+		s = NewStringSchema(properties)
+	case JSON:
+		s = NewJSONSchema(schemaDef, properties)
+	case PROTOBUF:
+		s = NewProtoSchema(schemaDef, properties)
+	case AVRO:
+		s = NewAvroSchema(schemaDef, properties)
+	case INT8:
+		s = NewInt8Schema(properties)
+	case INT16:
+		s = NewInt16Schema(properties)
+	case INT32:
+		s = NewInt32Schema(properties)
+	case INT64:
+		s = NewInt64Schema(properties)
+	case FLOAT:
+		s = NewFloatSchema(properties)
+	case DOUBLE:
+		s = NewDoubleSchema(properties)
+	default:
+		err = fmt.Errorf("not support schema type of %v", schemaType)
+	}
+	schema = s
+	return
+}
+
 type AvroCodec struct {
 	Codec *goavro.Codec
 }