Add schema logic in producer and consumer (#368)

Signed-off-by: xiaolong.ran <rxl@apache.org>

Fixes #344 

### Motivation

Type safety is extremely important in any application built around a message bus like Pulsar. Currently, Apache Pulsar supports the function of Schema Registry. And Java, CPP and Python clients already support schema registry related functions, In order to further improve the function of Go Client, we need to support the function of Schema Registry in Go Client.

### Modifications

- Add schema logic in producer
- Add schema logic in consumer
- Package basic tool functions for schema logic
diff --git a/.golangci.yml b/.golangci.yml
index 4467782..e87841b 100644
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -13,10 +13,8 @@
     - golint
     - gosimple
     - govet
-    - ineffassign
     - interfacer
     - misspell
-    - staticcheck
     - structcheck
     - stylecheck
     - typecheck
diff --git a/go.mod b/go.mod
index 3e41d06..8589bf3 100644
--- a/go.mod
+++ b/go.mod
@@ -7,10 +7,14 @@
 	github.com/apache/pulsar-client-go/oauth2 v0.0.0-20200715083626-b9f8c5cedefb
 	github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6
 	github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
+	github.com/davecgh/go-spew v1.1.1
 	github.com/gogo/protobuf v1.3.1
+	github.com/golang/protobuf v1.4.2
+	github.com/golang/snappy v0.0.1 // indirect
 	github.com/inconshreveable/mousetrap v1.0.0 // indirect
 	github.com/klauspost/compress v1.10.8
 	github.com/kr/pretty v0.2.0 // indirect
+	github.com/linkedin/goavro/v2 v2.9.8 // indirect
 	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
 	github.com/modern-go/reflect2 v1.0.1 // indirect
 	github.com/pierrec/lz4 v2.0.5+incompatible
diff --git a/go.sum b/go.sum
index e3bc434..a70caec 100644
--- a/go.sum
+++ b/go.sum
@@ -56,6 +56,8 @@
 github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
 github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
 github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
@@ -88,6 +90,11 @@
 github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
 github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/linkedin/goavro v1.0.5 h1:6ds0AI8upkEoafDk0a5r9q1p/xRtMq47jCilZYEqbmg=
+github.com/linkedin/goavro v2.1.0+incompatible h1:DV2aUlj2xZiuxQyvag8Dy7zjY69ENjS66bWkSfdpddY=
+github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM=
+github.com/linkedin/goavro/v2 v2.9.8 h1:jN50elxBsGBDGVDEKqUlDuU1cFwJ11K/yrJCBMe/7Wg=
+github.com/linkedin/goavro/v2 v2.9.8/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
 github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
 github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
 github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
diff --git a/integration-tests/license_test.go b/integration-tests/license_test.go
index 0aa9c84..e829560 100644
--- a/integration-tests/license_test.go
+++ b/integration-tests/license_test.go
@@ -67,6 +67,7 @@
 var skip = map[string]bool{
 	"../pulsar/internal/pulsar_proto/PulsarApi.pb.go": true,
 	"../.github/workflows/bot.yaml":                   true,
+	"../integration-tests/pb/hello.pb.go":             true,
 }
 
 func TestLicense(t *testing.T) {
diff --git a/integration-tests/pb/build.sh b/integration-tests/pb/build.sh
new file mode 100755
index 0000000..3987795
--- /dev/null
+++ b/integration-tests/pb/build.sh
@@ -0,0 +1,23 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+pkg=pb
+protoc --go_out=import_path=${pkg}:. hello.proto
+
diff --git a/integration-tests/pb/hello.pb.go b/integration-tests/pb/hello.pb.go
new file mode 100644
index 0000000..7cdf086
--- /dev/null
+++ b/integration-tests/pb/hello.pb.go
@@ -0,0 +1,100 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: hello.proto
+
+package pb
+
+import proto "github.com/golang/protobuf/proto"
+import fmt "fmt"
+import math "math"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type Test struct {
+	Num                  int32    `protobuf:"varint,1,opt,name=num,proto3" json:"num,omitempty"`
+	Msf                  string   `protobuf:"bytes,2,opt,name=msf,proto3" json:"msf,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *Test) Reset()         { *m = Test{} }
+func (m *Test) String() string { return proto.CompactTextString(m) }
+func (*Test) ProtoMessage()    {}
+func (*Test) Descriptor() ([]byte, []int) {
+	return fileDescriptor_hello_38c7a10202078446, []int{0}
+}
+func (m *Test) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_Test.Unmarshal(m, b)
+}
+func (m *Test) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_Test.Marshal(b, m, deterministic)
+}
+func (dst *Test) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_Test.Merge(dst, src)
+}
+func (m *Test) XXX_Size() int {
+	return xxx_messageInfo_Test.Size(m)
+}
+func (m *Test) XXX_DiscardUnknown() {
+	xxx_messageInfo_Test.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Test proto.InternalMessageInfo
+
+func (m *Test) GetNum() int32 {
+	if m != nil {
+		return m.Num
+	}
+	return 0
+}
+
+func (m *Test) GetMsf() string {
+	if m != nil {
+		return m.Msf
+	}
+	return ""
+}
+
+func init() {
+	proto.RegisterType((*Test)(nil), "prototest.Test")
+}
+
+func init() { proto.RegisterFile("hello.proto", fileDescriptor_hello_38c7a10202078446) }
+
+var fileDescriptor_hello_38c7a10202078446 = []byte{
+	// 87 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xce, 0x48, 0xcd, 0xc9,
+	0xc9, 0xd7, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x04, 0x53, 0x25, 0xa9, 0xc5, 0x25, 0x4a,
+	0x5a, 0x5c, 0x2c, 0x21, 0xa9, 0xc5, 0x25, 0x42, 0x02, 0x5c, 0xcc, 0x79, 0xa5, 0xb9, 0x12, 0x8c,
+	0x0a, 0x8c, 0x1a, 0xac, 0x41, 0x20, 0x26, 0x48, 0x24, 0xb7, 0x38, 0x4d, 0x82, 0x49, 0x81, 0x51,
+	0x83, 0x33, 0x08, 0xc4, 0x4c, 0x62, 0x03, 0x6b, 0x33, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0xc5,
+	0x3d, 0x96, 0x7b, 0x4c, 0x00, 0x00, 0x00,
+}
diff --git a/integration-tests/pb/hello.proto b/integration-tests/pb/hello.proto
new file mode 100644
index 0000000..547e273
--- /dev/null
+++ b/integration-tests/pb/hello.proto
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+syntax = "proto3";
+package prototest;
+
+message Test {
+    int32 num = 1;
+    string msf = 2;
+}
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index e9f803d..1c52b29 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -151,6 +151,8 @@
 	// A chain of interceptors, These interceptors will be called at some points defined in ConsumerInterceptor interface.
 	Interceptors ConsumerInterceptors
 
+	Schema Schema
+
 	// MaxReconnectToBroker set the maximum retry number of reconnectToBroker. (default: ultimate)
 	MaxReconnectToBroker *uint
 }
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index ed2af4b..dceb1e2 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -103,6 +103,12 @@
 		options.Name = generateRandomName()
 	}
 
+	if options.Schema != nil && options.Schema.GetSchemaInfo() != nil {
+		if options.Schema.GetSchemaInfo().Type == NONE {
+			options.Schema = NewBytesSchema(nil)
+		}
+	}
+
 	// did the user pass in a message channel?
 	messageCh := options.MessageChannel
 	if options.MessageChannel == nil {
@@ -311,6 +317,7 @@
 				interceptors:               c.options.Interceptors,
 				maxReconnectToBroker:       c.options.MaxReconnectToBroker,
 				keySharedPolicy:            c.options.KeySharedPolicy,
+				schema:                     c.options.Schema,
 			}
 			cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq)
 			ch <- ConsumerError{
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 2d5ff6b..5ec51fb 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -122,6 +122,7 @@
 	interceptors               ConsumerInterceptors
 	maxReconnectToBroker       *uint
 	keySharedPolicy            *KeySharedPolicy
+	schema                     Schema
 }
 
 type partitionConsumer struct {
@@ -521,6 +522,7 @@
 				topic:               pc.topic,
 				msgID:               msgID,
 				payLoad:             payload,
+				schema:              pc.options.schema,
 				replicationClusters: msgMeta.GetReplicateTo(),
 				replicatedFrom:      msgMeta.GetReplicatedFrom(),
 				redeliveryCount:     response.GetRedeliveryCount(),
@@ -535,6 +537,7 @@
 				topic:               pc.topic,
 				msgID:               msgID,
 				payLoad:             payload,
+				schema:              pc.options.schema,
 				replicationClusters: msgMeta.GetReplicateTo(),
 				replicatedFrom:      msgMeta.GetReplicatedFrom(),
 				redeliveryCount:     response.GetRedeliveryCount(),
@@ -862,6 +865,23 @@
 	initialPosition := toProtoInitialPosition(pc.options.subscriptionInitPos)
 	keySharedMeta := toProtoKeySharedMeta(pc.options.keySharedPolicy)
 	requestID := pc.client.rpcClient.NewRequestID()
+
+	pbSchema := new(pb.Schema)
+
+	if pc.options.schema != nil && pc.options.schema.GetSchemaInfo() != nil {
+		tmpSchemaType := pb.Schema_Type(int32(pc.options.schema.GetSchemaInfo().Type))
+		pbSchema = &pb.Schema{
+			Name:       proto.String(pc.options.schema.GetSchemaInfo().Name),
+			Type:       &tmpSchemaType,
+			SchemaData: []byte(pc.options.schema.GetSchemaInfo().Schema),
+			Properties: internal.ConvertFromStringMap(pc.options.schema.GetSchemaInfo().Properties),
+		}
+		pc.log.Debugf("The partition consumer schema name is: %s", pbSchema.Name)
+	} else {
+		pbSchema = nil
+		pc.log.Debug("The partition consumer schema is nil")
+	}
+
 	cmdSubscribe := &pb.CommandSubscribe{
 		Topic:                      proto.String(pc.topic),
 		Subscription:               proto.String(pc.options.subscription),
@@ -873,7 +893,7 @@
 		Durable:                    proto.Bool(pc.options.subscriptionMode == durable),
 		Metadata:                   internal.ConvertFromStringMap(pc.options.metadata),
 		ReadCompacted:              proto.Bool(pc.options.readCompacted),
-		Schema:                     nil,
+		Schema:                     pbSchema,
 		InitialPosition:            initialPosition.Enum(),
 		ReplicateSubscriptionState: proto.Bool(pc.options.replicateSubscriptionState),
 		KeySharedMeta:              keySharedMeta,
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index d796969..27cb0ed 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -178,6 +178,7 @@
 	replicationClusters []string
 	replicatedFrom      string
 	redeliveryCount     uint32
+	schema              Schema
 }
 
 func (msg *message) Topic() string {
@@ -220,6 +221,10 @@
 	return msg.replicatedFrom
 }
 
+func (msg *message) GetSchemaValue(v interface{}) error {
+	return msg.schema.Decode(msg.payLoad, v)
+}
+
 func (msg *message) ProducerName() string {
 	return msg.producerName
 }
diff --git a/pulsar/message.go b/pulsar/message.go
index 5137836..a3b2257 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -27,6 +27,9 @@
 	// Payload for the message
 	Payload []byte
 
+	//Value and payload is mutually exclusive, `Value interface{}` for schema message.
+	Value interface{}
+
 	// Key sets the key of the message for routing policy
 	Key string
 
@@ -102,6 +105,9 @@
 
 	// Get name of cluster, from which the message is replicated.
 	GetReplicatedFrom() string
+
+	//Get the de-serialized value of the message, according the configured
+	GetSchemaValue(v interface{}) error
 }
 
 // MessageID identifier for a particular message
diff --git a/pulsar/primitiveSerDe.go b/pulsar/primitiveSerDe.go
new file mode 100644
index 0000000..0d53aa1
--- /dev/null
+++ b/pulsar/primitiveSerDe.go
@@ -0,0 +1,316 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"encoding/binary"
+	"fmt"
+	"io"
+	"math"
+)
+
+const (
+	IoMaxSize     = 1024
+	maxBorrowSize = 10
+)
+
+var (
+	littleEndian = binary.LittleEndian
+)
+
+type BinaryFreeList chan []byte
+
+var BinarySerializer BinaryFreeList = make(chan []byte, IoMaxSize)
+
+func (b BinaryFreeList) Borrow() (buf []byte) {
+	select {
+	case buf = <-b:
+	default:
+		buf = make([]byte, maxBorrowSize)
+
+	}
+	return buf[:maxBorrowSize]
+}
+
+func (b BinaryFreeList) Return(buf []byte) {
+	select {
+	case b <- buf:
+	default:
+	}
+}
+
+func (b BinaryFreeList) Uint8(r io.Reader) (uint8, error) {
+	buf := b.Borrow()[:1]
+	if _, err := io.ReadFull(r, buf); err != nil {
+		b.Return(buf)
+		return 0, err
+	}
+	rv := buf[0]
+	b.Return(buf)
+	return rv, nil
+}
+
+func (b BinaryFreeList) Uint16(r io.Reader, byteOrder binary.ByteOrder) (uint16, error) {
+	buf := b.Borrow()[:2]
+	if _, err := io.ReadFull(r, buf); err != nil {
+		b.Return(buf)
+		return 0, err
+	}
+	rv := byteOrder.Uint16(buf)
+	b.Return(buf)
+	return rv, nil
+}
+
+func (b BinaryFreeList) Uint32(r io.Reader, byteOrder binary.ByteOrder) (uint32, error) {
+	buf := b.Borrow()[:4]
+	if _, err := io.ReadFull(r, buf); err != nil {
+		b.Return(buf)
+		return 0, err
+	}
+	rv := byteOrder.Uint32(buf)
+	b.Return(buf)
+	return rv, nil
+}
+
+func (b BinaryFreeList) Uint64(r io.Reader, byteOrder binary.ByteOrder) (uint64, error) {
+	buf := b.Borrow()[:8]
+	if _, err := io.ReadFull(r, buf); err != nil {
+		b.Return(buf)
+		return 0, err
+	}
+	rv := byteOrder.Uint64(buf)
+	b.Return(buf)
+	return rv, nil
+}
+
+func (b BinaryFreeList) Float64(buf []byte) (float64, error) {
+	if len(buf) < 8 {
+		return 0, fmt.Errorf("cannot decode binary double: %s", io.ErrShortBuffer)
+	}
+	return math.Float64frombits(binary.BigEndian.Uint64(buf[:8])), nil
+}
+
+func (b BinaryFreeList) Float32(buf []byte) (float32, error) {
+	if len(buf) < 4 {
+		return 0, fmt.Errorf("cannot decode binary float: %s", io.ErrShortBuffer)
+	}
+	return math.Float32frombits(binary.BigEndian.Uint32(buf[:4])), nil
+}
+
+func (b BinaryFreeList) PutUint8(w io.Writer, val uint8) error {
+	buf := b.Borrow()[:1]
+	buf[0] = val
+	_, err := w.Write(buf)
+	b.Return(buf)
+	return err
+}
+
+func (b BinaryFreeList) PutUint16(w io.Writer, byteOrder binary.ByteOrder, val uint16) error {
+	buf := b.Borrow()[:2]
+	byteOrder.PutUint16(buf, val)
+	_, err := w.Write(buf)
+	b.Return(buf)
+	return err
+}
+
+func (b BinaryFreeList) PutUint32(w io.Writer, byteOrder binary.ByteOrder, val uint32) error {
+
+	buf := b.Borrow()[:4]
+	byteOrder.PutUint32(buf, val)
+	_, err := w.Write(buf)
+	b.Return(buf)
+	return err
+}
+
+func (b BinaryFreeList) PutUint64(w io.Writer, byteOrder binary.ByteOrder, val uint64) error {
+	buf := b.Borrow()[:8]
+	byteOrder.PutUint64(buf, val)
+	_, err := w.Write(buf)
+	b.Return(buf)
+	return err
+}
+
+func (b BinaryFreeList) PutDouble(datum interface{}) ([]byte, error) {
+	var value float64
+	switch v := datum.(type) {
+	case float64:
+		value = v
+	case float32:
+		value = float64(v)
+	case int:
+		if value = float64(v); int(value) != v {
+			return nil, fmt.Errorf("serialize failed: provided Go int would lose precision: %d", v)
+		}
+	case int64:
+		if value = float64(v); int64(value) != v {
+			return nil, fmt.Errorf("serialize failed: provided Go int64 would lose precision: %d", v)
+		}
+	case int32:
+		if value = float64(v); int32(value) != v {
+			return nil, fmt.Errorf("serialize failed: provided Go int32 would lose precision: %d", v)
+		}
+	default:
+		return nil, fmt.Errorf("serialize failed: expected: Go numeric; received: %T", datum)
+	}
+	var buf []byte
+	buf = append(buf, 0, 0, 0, 0, 0, 0, 0, 0)
+	binary.BigEndian.PutUint64(buf[len(buf)-8:], math.Float64bits(value))
+	return buf, nil
+}
+
+func (b BinaryFreeList) PutFloat(datum interface{}) ([]byte, error) {
+	var value float32
+	switch v := datum.(type) {
+	case float32:
+		value = v
+	case float64:
+		value = float32(v)
+	case int:
+		if value = float32(v); int(value) != v {
+			return nil, fmt.Errorf("serialize failed: provided Go int would lose precision: %d", v)
+		}
+	case int64:
+		if value = float32(v); int64(value) != v {
+			return nil, fmt.Errorf("serialize failed: provided Go int64 would lose precision: %d", v)
+		}
+	case int32:
+		if value = float32(v); int32(value) != v {
+			return nil, fmt.Errorf("serialize failed: provided Go int32 would lose precision: %d", v)
+		}
+	default:
+		return nil, fmt.Errorf("serialize failed: expected: Go numeric; received: %T", datum)
+	}
+	var buf []byte
+	buf = append(buf, 0, 0, 0, 0)
+	binary.BigEndian.PutUint32(buf[len(buf)-4:], math.Float32bits(value))
+	return buf, nil
+}
+
+func ReadElements(r io.Reader, elements ...interface{}) error {
+	for _, element := range elements {
+		err := readElement(r, element)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func WriteElements(w io.Writer, elements ...interface{}) error {
+	for _, element := range elements {
+		err := writeElement(w, element)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func readElement(r io.Reader, element interface{}) error {
+	switch e := element.(type) {
+	case *int8:
+		rv, err := BinarySerializer.Uint8(r)
+		if err != nil {
+			return err
+		}
+		*e = int8(rv)
+		return nil
+
+	case *int16:
+		rv, err := BinarySerializer.Uint16(r, littleEndian)
+		if err != nil {
+			return err
+		}
+		*e = int16(rv)
+		return nil
+
+	case *int32:
+		rv, err := BinarySerializer.Uint32(r, littleEndian)
+		if err != nil {
+			return err
+		}
+		*e = int32(rv)
+		return nil
+
+	case *int64:
+		rv, err := BinarySerializer.Uint64(r, littleEndian)
+		if err != nil {
+			return err
+		}
+		*e = int64(rv)
+		return nil
+
+	case *bool:
+		rv, err := BinarySerializer.Uint8(r)
+		if err != nil {
+			return err
+		}
+		if rv == 0x00 {
+			*e = false
+		} else {
+			*e = true
+		}
+		return nil
+	}
+	return binary.Read(r, littleEndian, element)
+}
+
+func writeElement(w io.Writer, element interface{}) error {
+	switch e := element.(type) {
+	case int8:
+		err := BinarySerializer.PutUint8(w, uint8(e))
+		if err != nil {
+			return err
+		}
+		return nil
+
+	case int16:
+		err := BinarySerializer.PutUint16(w, littleEndian, uint16(e))
+		if err != nil {
+			return err
+		}
+		return nil
+
+	case int32:
+		err := BinarySerializer.PutUint32(w, littleEndian, uint32(e))
+		if err != nil {
+			return err
+		}
+		return nil
+
+	case int64:
+		err := BinarySerializer.PutUint64(w, littleEndian, uint64(e))
+		if err != nil {
+			return err
+		}
+		return nil
+
+	case bool:
+		var err error
+		if e {
+			err = BinarySerializer.PutUint8(w, 0x01)
+		} else {
+			err = BinarySerializer.PutUint8(w, 0x00)
+		}
+		if err != nil {
+			return err
+		}
+		return nil
+	}
+	return binary.Write(w, littleEndian, element)
+}
diff --git a/pulsar/primitiveSerDe_test.go b/pulsar/primitiveSerDe_test.go
new file mode 100644
index 0000000..6a2901e
--- /dev/null
+++ b/pulsar/primitiveSerDe_test.go
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"bytes"
+	"io"
+	"reflect"
+	"testing"
+
+	"github.com/davecgh/go-spew/spew"
+)
+
+func TestWriteElements(t *testing.T) {
+	tests := []struct {
+		in  interface{}
+		buf []byte
+	}{
+		{int8(1), []byte{0x01}},
+		{uint8(2), []byte{0x02}},
+		{int16(4), []byte{0x04, 0x00}},
+		{uint16(16), []byte{0x10, 0x00}},
+		{int32(1), []byte{0x01, 0x00, 0x00, 0x00}},
+		{uint32(256), []byte{0x00, 0x01, 0x00, 0x00}},
+		{
+			int64(65536),
+			[]byte{0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00},
+		},
+		{
+			uint64(4294967296),
+			[]byte{0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00},
+		},
+		{
+			true,
+			[]byte{0x01},
+		},
+		{
+			false,
+			[]byte{0x00},
+		},
+	}
+
+	t.Logf("Running %d tests", len(tests))
+	for i, test := range tests {
+		value := test
+
+		var buf bytes.Buffer
+		err := WriteElements(&buf, value.in)
+		if err != nil {
+			t.Errorf("writeElement #%d error %v", i, err)
+			continue
+		}
+		if !bytes.Equal(buf.Bytes(), test.buf) {
+			t.Error(test.in)
+			t.Errorf("writeElement #%d\n got: %s want: %s", i,
+				spew.Sdump(buf.Bytes()), spew.Sdump(test.buf))
+			continue
+		}
+
+		// Read from wire format.
+		rbuf := bytes.NewReader(test.buf)
+		val := test.in
+		if reflect.ValueOf(test.in).Kind() != reflect.Ptr {
+			val = reflect.New(reflect.TypeOf(test.in)).Interface()
+		}
+		err = ReadElements(rbuf, val)
+		if err != nil {
+			t.Errorf("readElement #%d error %v", i, err)
+			continue
+		}
+		ival := val
+		if reflect.ValueOf(test.in).Kind() != reflect.Ptr {
+			ival = reflect.Indirect(reflect.ValueOf(val)).Interface()
+		}
+		if !reflect.DeepEqual(ival, test.in) {
+			t.Errorf("readElement #%d\n got: %s want: %s", i,
+				spew.Sdump(ival), spew.Sdump(test.in))
+			continue
+		}
+	}
+}
+
+func TestElementErrors(t *testing.T) {
+	tests := []struct {
+		in       interface{}
+		max      int
+		writeErr error
+		readErr  error
+	}{
+		{int8(1), 0, nil, io.EOF},
+		{uint8(2), 0, nil, io.EOF},
+		{int16(4), 0, nil, io.EOF},
+		{uint16(16), 0, nil, io.EOF},
+		{int32(1), 0, nil, io.EOF},
+		{uint32(256), 0, nil, io.EOF},
+		{
+			int64(65536),
+			0, nil, io.EOF,
+		},
+		{
+			uint64(4294967296),
+			0, nil, io.EOF,
+		},
+		{
+			true,
+			0, nil, io.EOF,
+		},
+		{
+			false,
+			0, nil, io.EOF,
+		},
+	}
+
+	t.Logf("Running %d tests", len(tests))
+	for i, test := range tests {
+		var r bytes.Reader
+		val := test.in
+		if reflect.ValueOf(test.in).Kind() != reflect.Ptr {
+			val = reflect.New(reflect.TypeOf(test.in)).Interface()
+		}
+		err := ReadElements(&r, val)
+		if err != test.readErr {
+			t.Errorf("readElement #%d wrong error got: %v, want: %v",
+				i, err, test.readErr)
+			continue
+		}
+	}
+}
diff --git a/pulsar/producer.go b/pulsar/producer.go
index a6cee38..a2b7526 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -139,6 +139,8 @@
 	// A chain of interceptors, These interceptors will be called at some points defined in ProducerInterceptor interface
 	Interceptors ProducerInterceptors
 
+	Schema Schema
+
 	// MaxReconnectToBroker set the maximum retry number of reconnectToBroker. (default: ultimate)
 	MaxReconnectToBroker *uint
 }
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 42ddf0b..c166f70 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -126,6 +126,12 @@
 		p.messageRouter = options.MessageRouter
 	}
 
+	if options.Schema != nil && options.Schema.GetSchemaInfo() != nil {
+		if options.Schema.GetSchemaInfo().Type == NONE {
+			options.Schema = NewBytesSchema(nil)
+		}
+	}
+
 	err := p.internalCreatePartitionsProducers()
 	if err != nil {
 		return nil, err
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 399bcfe..21bea9f 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -99,15 +99,14 @@
 	batchFlushTicker    *time.Ticker
 
 	// Channel where app is posting messages to be published
-	eventsChan chan interface{}
-
+	eventsChan      chan interface{}
 	connectClosedCh chan connectionClosed
 
 	publishSemaphore internal.Semaphore
 	pendingQueue     internal.BlockingQueue
 	lastSequenceID   int64
-
-	partitionIdx int32
+	schemaInfo       *SchemaInfo
+	partitionIdx     int32
 }
 
 func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int) (
@@ -127,6 +126,7 @@
 	}
 
 	logger := client.log.SubLogger(log.Fields{"topic": topic})
+
 	p := &partitionProducer{
 		state:            producerInit,
 		client:           client,
@@ -143,6 +143,12 @@
 		partitionIdx:     int32(partitionIdx),
 	}
 
+	if options.Schema != nil && options.Schema.GetSchemaInfo() != nil {
+		p.schemaInfo = options.Schema.GetSchemaInfo()
+	} else {
+		p.schemaInfo = nil
+	}
+
 	if options.Name != "" {
 		p.producerName = options.Name
 	}
@@ -175,12 +181,30 @@
 
 	p.log.Debug("Lookup result: ", lr)
 	id := p.client.rpcClient.NewRequestID()
+
+	// set schema info for producer
+
+	pbSchema := new(pb.Schema)
+	if p.schemaInfo != nil {
+		tmpSchemaType := pb.Schema_Type(int32(p.schemaInfo.Type))
+		pbSchema = &pb.Schema{
+			Name:       proto.String(p.schemaInfo.Name),
+			Type:       &tmpSchemaType,
+			SchemaData: []byte(p.schemaInfo.Schema),
+			Properties: internal.ConvertFromStringMap(p.schemaInfo.Properties),
+		}
+		p.log.Debugf("The partition consumer schema name is: %s", pbSchema.Name)
+	} else {
+		pbSchema = nil
+		p.log.Debug("The partition consumer schema is nil")
+	}
+
 	cmdProducer := &pb.CommandProducer{
 		RequestId:  proto.Uint64(id),
 		Topic:      proto.String(p.topic),
 		Encrypted:  nil,
 		ProducerId: proto.Uint64(p.producerID),
-		Schema:     nil,
+		Schema:     pbSchema,
 	}
 
 	if p.producerName != "" {
@@ -311,12 +335,26 @@
 
 	msg := request.msg
 
+	payload := msg.Payload
+	var schemaPayload []byte
+	var err error
+	if p.options.Schema != nil {
+		schemaPayload, err = p.options.Schema.Encode(msg.Value)
+		if err != nil {
+			return
+		}
+	}
+
+	if payload == nil {
+		payload = schemaPayload
+	}
+
 	// if msg is too large
-	if len(msg.Payload) > int(p.cnx.GetMaxMessageSize()) {
+	if len(payload) > int(p.cnx.GetMaxMessageSize()) {
 		p.publishSemaphore.Release()
 		request.callback(nil, request.msg, errMessageTooLarge)
 		p.log.WithError(errMessageTooLarge).
-			WithField("size", len(msg.Payload)).
+			WithField("size", len(payload)).
 			WithField("properties", msg.Properties).
 			Error()
 		publishErrors.Inc()
@@ -333,7 +371,7 @@
 		deliverAt.UnixNano() < 0
 
 	smm := &pb.SingleMessageMetadata{
-		PayloadSize: proto.Int(len(msg.Payload)),
+		PayloadSize: proto.Int(len(payload)),
 	}
 
 	if msg.EventTime.UnixNano() != 0 {
@@ -358,18 +396,18 @@
 	if !sendAsBatch {
 		p.internalFlushCurrentBatch()
 	}
-	added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request,
+	added := p.batchBuilder.Add(smm, sequenceID, payload, request,
 		msg.ReplicationClusters, deliverAt)
 	if !added {
 		// The current batch is full.. flush it and retry
 		p.internalFlushCurrentBatch()
 
 		// after flushing try again to add the current payload
-		if ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request,
+		if ok := p.batchBuilder.Add(smm, sequenceID, payload, request,
 			msg.ReplicationClusters, deliverAt); !ok {
 			p.publishSemaphore.Release()
 			request.callback(nil, request.msg, errFailAddBatch)
-			p.log.WithField("size", len(msg.Payload)).
+			p.log.WithField("size", len(payload)).
 				WithField("sequenceID", sequenceID).
 				WithField("properties", msg.Properties).
 				Error("unable to add message to batch")
diff --git a/pulsar/schema.go b/pulsar/schema.go
new file mode 100644
index 0000000..7885603
--- /dev/null
+++ b/pulsar/schema.go
@@ -0,0 +1,502 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"bytes"
+	"encoding/json"
+	"errors"
+	"reflect"
+	"unsafe"
+
+	log "github.com/sirupsen/logrus"
+
+	"github.com/gogo/protobuf/proto"
+	"github.com/linkedin/goavro/v2"
+)
+
+type SchemaType int
+
+const (
+	NONE        SchemaType = iota //No schema defined
+	STRING                        //Simple String encoding with UTF-8
+	JSON                          //JSON object encoding and validation
+	PROTOBUF                      //Protobuf message encoding and decoding
+	AVRO                          //Serialize and deserialize via Avro
+	BOOLEAN                       //
+	INT8                          //A 8-byte integer.
+	INT16                         //A 16-byte integer.
+	INT32                         //A 32-byte integer.
+	INT64                         //A 64-byte integer.
+	FLOAT                         //A float number.
+	DOUBLE                        //A double number
+	_                             //
+	_                             //
+	_                             //
+	KeyValue                      //A Schema that contains Key Schema and Value Schema.
+	BYTES       = -1              //A bytes array.
+	AUTO        = -2              //
+	AutoConsume = -3              //Auto Consume Type.
+	AutoPublish = -4              // Auto Publish Type.
+)
+
+// Encapsulates data around the schema definition
+type SchemaInfo struct {
+	Name       string
+	Schema     string
+	Type       SchemaType
+	Properties map[string]string
+}
+
+type Schema interface {
+	Encode(v interface{}) ([]byte, error)
+	Decode(data []byte, v interface{}) error
+	Validate(message []byte) error
+	GetSchemaInfo() *SchemaInfo
+}
+
+type AvroCodec struct {
+	Codec *goavro.Codec
+}
+
+func NewSchemaDefinition(schema *goavro.Codec) *AvroCodec {
+	schemaDef := &AvroCodec{
+		Codec: schema,
+	}
+	return schemaDef
+}
+
+// initAvroCodec returns a Codec used to translate between a byte slice of either
+// binary or textual Avro data and native Go data.
+func initAvroCodec(codec string) (*goavro.Codec, error) {
+	return goavro.NewCodec(codec)
+}
+
+type JSONSchema struct {
+	AvroCodec
+	SchemaInfo
+}
+
+func NewJSONSchema(jsonAvroSchemaDef string, properties map[string]string) *JSONSchema {
+	js := new(JSONSchema)
+	avroCodec, err := initAvroCodec(jsonAvroSchemaDef)
+	if err != nil {
+		log.Fatalf("init codec error:%v", err)
+	}
+	schemaDef := NewSchemaDefinition(avroCodec)
+	js.SchemaInfo.Schema = schemaDef.Codec.Schema()
+	js.SchemaInfo.Type = JSON
+	js.SchemaInfo.Properties = properties
+	js.SchemaInfo.Name = "JSON"
+	return js
+}
+
+func (js *JSONSchema) Encode(data interface{}) ([]byte, error) {
+	return json.Marshal(data)
+}
+
+func (js *JSONSchema) Decode(data []byte, v interface{}) error {
+	return json.Unmarshal(data, v)
+}
+
+func (js *JSONSchema) Validate(message []byte) error {
+	return js.Decode(message, nil)
+}
+
+func (js *JSONSchema) GetSchemaInfo() *SchemaInfo {
+	return &js.SchemaInfo
+}
+
+type ProtoSchema struct {
+	AvroCodec
+	SchemaInfo
+}
+
+func NewProtoSchema(protoAvroSchemaDef string, properties map[string]string) *ProtoSchema {
+	ps := new(ProtoSchema)
+	avroCodec, err := initAvroCodec(protoAvroSchemaDef)
+	if err != nil {
+		log.Fatalf("init codec error:%v", err)
+	}
+	schemaDef := NewSchemaDefinition(avroCodec)
+	ps.AvroCodec.Codec = schemaDef.Codec
+	ps.SchemaInfo.Schema = schemaDef.Codec.Schema()
+	ps.SchemaInfo.Type = PROTOBUF
+	ps.SchemaInfo.Properties = properties
+	ps.SchemaInfo.Name = "Proto"
+	return ps
+}
+
+func (ps *ProtoSchema) Encode(data interface{}) ([]byte, error) {
+	return proto.Marshal(data.(proto.Message))
+}
+
+func (ps *ProtoSchema) Decode(data []byte, v interface{}) error {
+	return proto.Unmarshal(data, v.(proto.Message))
+}
+
+func (ps *ProtoSchema) Validate(message []byte) error {
+	return ps.Decode(message, nil)
+}
+
+func (ps *ProtoSchema) GetSchemaInfo() *SchemaInfo {
+	return &ps.SchemaInfo
+}
+
+type AvroSchema struct {
+	AvroCodec
+	SchemaInfo
+}
+
+func NewAvroSchema(avroSchemaDef string, properties map[string]string) *AvroSchema {
+	as := new(AvroSchema)
+	avroCodec, err := initAvroCodec(avroSchemaDef)
+	if err != nil {
+		log.Fatalf("init codec error:%v", err)
+	}
+	schemaDef := NewSchemaDefinition(avroCodec)
+	as.AvroCodec.Codec = schemaDef.Codec
+	as.SchemaInfo.Schema = schemaDef.Codec.Schema()
+	as.SchemaInfo.Type = AVRO
+	as.SchemaInfo.Name = "Avro"
+	as.SchemaInfo.Properties = properties
+	return as
+}
+
+func (as *AvroSchema) Encode(data interface{}) ([]byte, error) {
+	textual, err := json.Marshal(data)
+	if err != nil {
+		log.Errorf("serialize data error:%s", err.Error())
+		return nil, err
+	}
+	native, _, err := as.Codec.NativeFromTextual(textual)
+	if err != nil {
+		log.Errorf("convert native Go form to binary Avro data error:%s", err.Error())
+		return nil, err
+	}
+	return as.Codec.BinaryFromNative(nil, native)
+}
+
+func (as *AvroSchema) Decode(data []byte, v interface{}) error {
+	native, _, err := as.Codec.NativeFromBinary(data)
+	if err != nil {
+		log.Errorf("convert binary Avro data back to native Go form error:%s", err.Error())
+		return err
+	}
+	textual, err := as.Codec.TextualFromNative(nil, native)
+	if err != nil {
+		log.Errorf("convert native Go form to textual Avro data error:%s", err.Error())
+		return err
+	}
+	err = json.Unmarshal(textual, v)
+	if err != nil {
+		log.Errorf("unSerialize textual error:%s", err.Error())
+		return err
+	}
+	return nil
+}
+
+func (as *AvroSchema) Validate(message []byte) error {
+	return as.Decode(message, nil)
+}
+
+func (as *AvroSchema) GetSchemaInfo() *SchemaInfo {
+	return &as.SchemaInfo
+}
+
+type StringSchema struct {
+	SchemaInfo
+}
+
+func NewStringSchema(properties map[string]string) *StringSchema {
+	strSchema := new(StringSchema)
+	strSchema.SchemaInfo.Properties = properties
+	strSchema.SchemaInfo.Name = "String"
+	strSchema.SchemaInfo.Type = STRING
+	strSchema.SchemaInfo.Schema = ""
+	return strSchema
+}
+
+func (ss *StringSchema) Encode(v interface{}) ([]byte, error) {
+	return []byte(v.(string)), nil
+}
+
+func (ss *StringSchema) Decode(data []byte, v interface{}) error {
+	bh := (*reflect.SliceHeader)(unsafe.Pointer(&data))
+	sh := reflect.StringHeader{
+		Data: bh.Data,
+		Len:  bh.Len,
+	}
+	shPtr := (*string)(unsafe.Pointer(&sh))
+	reflect.ValueOf(v).Elem().Set(reflect.ValueOf(shPtr))
+	return nil
+}
+
+func (ss *StringSchema) Validate(message []byte) error {
+	return ss.Decode(message, nil)
+}
+
+func (ss *StringSchema) GetSchemaInfo() *SchemaInfo {
+	return &ss.SchemaInfo
+}
+
+type BytesSchema struct {
+	SchemaInfo
+}
+
+func NewBytesSchema(properties map[string]string) *BytesSchema {
+	bytesSchema := new(BytesSchema)
+	bytesSchema.SchemaInfo.Properties = properties
+	bytesSchema.SchemaInfo.Name = "Bytes"
+	bytesSchema.SchemaInfo.Type = BYTES
+	bytesSchema.SchemaInfo.Schema = ""
+	return bytesSchema
+}
+
+func (bs *BytesSchema) Encode(data interface{}) ([]byte, error) {
+	return data.([]byte), nil
+}
+
+func (bs *BytesSchema) Decode(data []byte, v interface{}) error {
+	reflect.ValueOf(v).Elem().Set(reflect.ValueOf(data))
+	return nil
+}
+
+func (bs *BytesSchema) Validate(message []byte) error {
+	return bs.Decode(message, nil)
+}
+
+func (bs *BytesSchema) GetSchemaInfo() *SchemaInfo {
+	return &bs.SchemaInfo
+}
+
+type Int8Schema struct {
+	SchemaInfo
+}
+
+func NewInt8Schema(properties map[string]string) *Int8Schema {
+	int8Schema := new(Int8Schema)
+	int8Schema.SchemaInfo.Properties = properties
+	int8Schema.SchemaInfo.Schema = ""
+	int8Schema.SchemaInfo.Type = INT8
+	int8Schema.SchemaInfo.Name = "INT8"
+	return int8Schema
+}
+
+func (is8 *Int8Schema) Encode(value interface{}) ([]byte, error) {
+	var buf bytes.Buffer
+	err := WriteElements(&buf, value.(int8))
+	return buf.Bytes(), err
+}
+
+func (is8 *Int8Schema) Decode(data []byte, v interface{}) error {
+	buf := bytes.NewReader(data)
+	return ReadElements(buf, v)
+}
+
+func (is8 *Int8Schema) Validate(message []byte) error {
+	if len(message) != 1 {
+		return errors.New("size of data received by Int8Schema is not 1")
+	}
+	return nil
+}
+
+func (is8 *Int8Schema) GetSchemaInfo() *SchemaInfo {
+	return &is8.SchemaInfo
+}
+
+type Int16Schema struct {
+	SchemaInfo
+}
+
+func NewInt16Schema(properties map[string]string) *Int16Schema {
+	int16Schema := new(Int16Schema)
+	int16Schema.SchemaInfo.Properties = properties
+	int16Schema.SchemaInfo.Name = "INT16"
+	int16Schema.SchemaInfo.Type = INT16
+	int16Schema.SchemaInfo.Schema = ""
+	return int16Schema
+}
+
+func (is16 *Int16Schema) Encode(value interface{}) ([]byte, error) {
+	var buf bytes.Buffer
+	err := WriteElements(&buf, value.(int16))
+	return buf.Bytes(), err
+}
+
+func (is16 *Int16Schema) Decode(data []byte, v interface{}) error {
+	buf := bytes.NewReader(data)
+	return ReadElements(buf, v)
+}
+
+func (is16 *Int16Schema) Validate(message []byte) error {
+	if len(message) != 2 {
+		return errors.New("size of data received by Int16Schema is not 2")
+	}
+	return nil
+}
+
+func (is16 *Int16Schema) GetSchemaInfo() *SchemaInfo {
+	return &is16.SchemaInfo
+}
+
+type Int32Schema struct {
+	SchemaInfo
+}
+
+func NewInt32Schema(properties map[string]string) *Int32Schema {
+	int32Schema := new(Int32Schema)
+	int32Schema.SchemaInfo.Properties = properties
+	int32Schema.SchemaInfo.Schema = ""
+	int32Schema.SchemaInfo.Name = "INT32"
+	int32Schema.SchemaInfo.Type = INT32
+	return int32Schema
+}
+
+func (is32 *Int32Schema) Encode(value interface{}) ([]byte, error) {
+	var buf bytes.Buffer
+	err := WriteElements(&buf, value.(int32))
+	return buf.Bytes(), err
+}
+
+func (is32 *Int32Schema) Decode(data []byte, v interface{}) error {
+	buf := bytes.NewReader(data)
+	return ReadElements(buf, v)
+}
+
+func (is32 *Int32Schema) Validate(message []byte) error {
+	if len(message) != 4 {
+		return errors.New("size of data received by Int32Schema is not 4")
+	}
+	return nil
+}
+
+func (is32 *Int32Schema) GetSchemaInfo() *SchemaInfo {
+	return &is32.SchemaInfo
+}
+
+type Int64Schema struct {
+	SchemaInfo
+}
+
+func NewInt64Schema(properties map[string]string) *Int64Schema {
+	int64Schema := new(Int64Schema)
+	int64Schema.SchemaInfo.Properties = properties
+	int64Schema.SchemaInfo.Name = "INT64"
+	int64Schema.SchemaInfo.Type = INT64
+	int64Schema.SchemaInfo.Schema = ""
+	return int64Schema
+}
+
+func (is64 *Int64Schema) Encode(value interface{}) ([]byte, error) {
+	var buf bytes.Buffer
+	err := WriteElements(&buf, value.(int64))
+	return buf.Bytes(), err
+}
+
+func (is64 *Int64Schema) Decode(data []byte, v interface{}) error {
+	buf := bytes.NewReader(data)
+	return ReadElements(buf, v)
+}
+
+func (is64 *Int64Schema) Validate(message []byte) error {
+	if len(message) != 8 {
+		return errors.New("size of data received by Int64Schema is not 8")
+	}
+	return nil
+}
+
+func (is64 *Int64Schema) GetSchemaInfo() *SchemaInfo {
+	return &is64.SchemaInfo
+}
+
+type FloatSchema struct {
+	SchemaInfo
+}
+
+func NewFloatSchema(properties map[string]string) *FloatSchema {
+	floatSchema := new(FloatSchema)
+	floatSchema.SchemaInfo.Properties = properties
+	floatSchema.SchemaInfo.Type = FLOAT
+	floatSchema.SchemaInfo.Name = "FLOAT"
+	floatSchema.SchemaInfo.Schema = ""
+	return floatSchema
+}
+
+func (fs *FloatSchema) Encode(value interface{}) ([]byte, error) {
+	return BinarySerializer.PutFloat(value)
+}
+
+func (fs *FloatSchema) Decode(data []byte, v interface{}) error {
+	floatValue, err := BinarySerializer.Float32(data)
+	if err != nil {
+		log.Errorf("unSerialize float error:%s", err.Error())
+		return err
+	}
+	reflect.ValueOf(v).Elem().Set(reflect.ValueOf(floatValue))
+	return nil
+}
+
+func (fs *FloatSchema) Validate(message []byte) error {
+	if len(message) != 4 {
+		return errors.New("size of data received by FloatSchema is not 4")
+	}
+	return nil
+}
+
+func (fs *FloatSchema) GetSchemaInfo() *SchemaInfo {
+	return &fs.SchemaInfo
+}
+
+type DoubleSchema struct {
+	SchemaInfo
+}
+
+func NewDoubleSchema(properties map[string]string) *DoubleSchema {
+	doubleSchema := new(DoubleSchema)
+	doubleSchema.SchemaInfo.Properties = properties
+	doubleSchema.SchemaInfo.Type = DOUBLE
+	doubleSchema.SchemaInfo.Name = "DOUBLE"
+	doubleSchema.SchemaInfo.Schema = ""
+	return doubleSchema
+}
+
+func (ds *DoubleSchema) Encode(value interface{}) ([]byte, error) {
+	return BinarySerializer.PutDouble(value)
+}
+
+func (ds *DoubleSchema) Decode(data []byte, v interface{}) error {
+	doubleValue, err := BinarySerializer.Float64(data)
+	if err != nil {
+		log.Errorf("unSerialize double value error:%s", err.Error())
+		return err
+	}
+	reflect.ValueOf(v).Elem().Set(reflect.ValueOf(doubleValue))
+	return nil
+}
+
+func (ds *DoubleSchema) Validate(message []byte) error {
+	if len(message) != 8 {
+		return errors.New("size of data received by DoubleSchema is not 8")
+	}
+	return nil
+}
+
+func (ds *DoubleSchema) GetSchemaInfo() *SchemaInfo {
+	return &ds.SchemaInfo
+}
diff --git a/pulsar/schema_def_test.go b/pulsar/schema_def_test.go
new file mode 100644
index 0000000..5c32cc5
--- /dev/null
+++ b/pulsar/schema_def_test.go
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestSchemaDef(t *testing.T) {
+	errSchemaDef := `{"type":"record","name":"Example","namespace":"test"," +
+		""fields":[{"name":"ID","type":"int64"},{"name":"Name","type":"string"}]}`
+
+	_, err := initAvroCodec(errSchemaDef)
+	assert.NotNil(t, err)
+
+	errSchemaDef1 := `"{"type":"record","name":"Example","namespace":"test"," +
+		""fields":[{"name":"ID","type":"bool"},{"name":"Name","type":"string"}]}`
+	_, err = initAvroCodec(errSchemaDef1)
+	assert.NotNil(t, err)
+
+	errSchemaDef2 := `"{"type":"record","name":"Example","namespace":"test"," +
+		""fields":[{"name":"ID","type":"float32"},{"name":"Name","type":"string"}]}`
+	_, err = initAvroCodec(errSchemaDef2)
+	assert.NotNil(t, err)
+
+	errSchemaDef3 := `{"type":"record","name":"Example","namespace":"test"," +
+		""fields":[{"name":"ID","type":"float64"},{"name":"Name","type":"string"}]}`
+	_, err = initAvroCodec(errSchemaDef3)
+	assert.NotNil(t, err)
+
+	errSchemaDef4 := `{"type":"record","name":"Example","namespace":"test"," +
+		""fields":[{"name":"ID","type":"byte"},{"name":"Name","type":"string"}]}`
+	_, err = initAvroCodec(errSchemaDef4)
+	assert.NotNil(t, err)
+
+	errSchemaDef5 := `{"type":"record","name":"Example","namespace":"operation.createJsonConsumer"," +
+		""fields":[{"name":"ID","type":"byte"},{"name":"Name","" +
+		"type":":["null","string"],"default":null"}]}`
+	_, err = initAvroCodec(errSchemaDef5)
+	assert.NotNil(t, err)
+}
diff --git a/pulsar/schema_test.go b/pulsar/schema_test.go
new file mode 100644
index 0000000..ce19bac
--- /dev/null
+++ b/pulsar/schema_test.go
@@ -0,0 +1,422 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"context"
+	"log"
+	"testing"
+
+	"github.com/apache/pulsar-client-go/integration-tests/pb"
+	"github.com/stretchr/testify/assert"
+)
+
+type testJSON struct {
+	ID   int    `json:"id"`
+	Name string `json:"name"`
+}
+
+type testAvro struct {
+	ID   int
+	Name string
+}
+
+var (
+	exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+		"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
+	protoSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+		"\"fields\":[{\"name\":\"num\",\"type\":\"int\"},{\"name\":\"msf\",\"type\":\"string\"}]}"
+)
+
+func createClient() Client {
+	// create client
+	lookupURL := "pulsar://localhost:6650"
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	if err != nil {
+		log.Fatal(err)
+	}
+	return client
+}
+
+func TestJsonSchema(t *testing.T) {
+	client := createClient()
+	defer client.Close()
+
+	properties := make(map[string]string)
+	properties["pulsar"] = "hello"
+	jsonSchemaWithProperties := NewJSONSchema(exampleSchemaDef, properties)
+	producer1, err := client.CreateProducer(ProducerOptions{
+		Topic:  "jsonTopic",
+		Schema: jsonSchemaWithProperties,
+	})
+	assert.Nil(t, err)
+
+	_, err = producer1.Send(context.Background(), &ProducerMessage{
+		Value: &testJSON{
+			ID:   100,
+			Name: "pulsar",
+		},
+	})
+	if err != nil {
+		log.Fatal(err)
+	}
+	producer1.Close()
+
+	//create consumer
+	var s testJSON
+
+	consumerJS := NewJSONSchema(exampleSchemaDef, nil)
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                       "jsonTopic",
+		SubscriptionName:            "sub-1",
+		Schema:                      consumerJS,
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
+	})
+	assert.Nil(t, err)
+	msg, err := consumer.Receive(context.Background())
+	assert.Nil(t, err)
+	err = msg.GetSchemaValue(&s)
+	assert.Nil(t, err)
+	assert.Equal(t, s.ID, 100)
+	assert.Equal(t, s.Name, "pulsar")
+
+	defer consumer.Close()
+}
+
+func TestProtoSchema(t *testing.T) {
+	client := createClient()
+	defer client.Close()
+
+	// create producer
+	psProducer := NewProtoSchema(protoSchemaDef, nil)
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:  "proto",
+		Schema: psProducer,
+	})
+	assert.Nil(t, err)
+
+	if _, err := producer.Send(context.Background(), &ProducerMessage{
+		Value: &pb.Test{
+			Num: 100,
+			Msf: "pulsar",
+		},
+	}); err != nil {
+		log.Fatal(err)
+	}
+
+	//create consumer
+	unobj := pb.Test{}
+	psConsumer := NewProtoSchema(protoSchemaDef, nil)
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                       "proto",
+		SubscriptionName:            "sub-1",
+		Schema:                      psConsumer,
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
+	})
+	assert.Nil(t, err)
+
+	msg, err := consumer.Receive(context.Background())
+	assert.Nil(t, err)
+	err = msg.GetSchemaValue(&unobj)
+	assert.Nil(t, err)
+	assert.Equal(t, unobj.Num, int32(100))
+	assert.Equal(t, unobj.Msf, "pulsar")
+	defer consumer.Close()
+}
+
+func TestAvroSchema(t *testing.T) {
+	client := createClient()
+	defer client.Close()
+
+	// create producer
+	asProducer := NewAvroSchema(exampleSchemaDef, nil)
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:  "avro-topic",
+		Schema: asProducer,
+	})
+	assert.Nil(t, err)
+	if _, err := producer.Send(context.Background(), &ProducerMessage{
+		Value: testAvro{
+			ID:   100,
+			Name: "pulsar",
+		},
+	}); err != nil {
+		log.Fatal(err)
+	}
+
+	//create consumer
+	unobj := testAvro{}
+
+	asConsumer := NewAvroSchema(exampleSchemaDef, nil)
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                       "avro-topic",
+		SubscriptionName:            "sub-1",
+		Schema:                      asConsumer,
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
+	})
+	assert.Nil(t, err)
+
+	msg, err := consumer.Receive(context.Background())
+	assert.Nil(t, err)
+	err = msg.GetSchemaValue(&unobj)
+	assert.Nil(t, err)
+	assert.Equal(t, unobj.ID, 100)
+	assert.Equal(t, unobj.Name, "pulsar")
+	defer consumer.Close()
+}
+
+func TestStringSchema(t *testing.T) {
+	client := createClient()
+	defer client.Close()
+
+	ssProducer := NewStringSchema(nil)
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:  "strTopic",
+		Schema: ssProducer,
+	})
+	assert.Nil(t, err)
+	if _, err := producer.Send(context.Background(), &ProducerMessage{
+		Value: "hello pulsar",
+	}); err != nil {
+		log.Fatal(err)
+	}
+	defer producer.Close()
+
+	var res *string
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                       "strTopic",
+		SubscriptionName:            "sub-2",
+		Schema:                      NewStringSchema(nil),
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
+	})
+	assert.Nil(t, err)
+
+	msg, err := consumer.Receive(context.Background())
+	assert.Nil(t, err)
+	err = msg.GetSchemaValue(&res)
+	assert.Nil(t, err)
+	assert.Equal(t, *res, "hello pulsar")
+
+	defer consumer.Close()
+}
+
+func TestInt8Schema(t *testing.T) {
+	client := createClient()
+	defer client.Close()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:  "int8Topic1",
+		Schema: NewInt8Schema(nil),
+	})
+	assert.Nil(t, err)
+	ctx := context.Background()
+	if _, err := producer.Send(ctx, &ProducerMessage{
+		Value: int8(1),
+	}); err != nil {
+		log.Fatal(err)
+	}
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                       "int8Topic1",
+		SubscriptionName:            "sub-2",
+		Schema:                      NewInt8Schema(nil),
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
+	})
+	assert.Nil(t, err)
+
+	var res int8
+	msg, err := consumer.Receive(ctx)
+	assert.Nil(t, err)
+	err = msg.GetSchemaValue(&res)
+	assert.Nil(t, err)
+	assert.Equal(t, res, int8(1))
+
+	defer consumer.Close()
+}
+
+func TestInt16Schema(t *testing.T) {
+	client := createClient()
+	defer client.Close()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:  "int16Topic",
+		Schema: NewInt16Schema(nil),
+	})
+	assert.Nil(t, err)
+	ctx := context.Background()
+	if _, err := producer.Send(ctx, &ProducerMessage{
+		Value: int16(1),
+	}); err != nil {
+		log.Fatal(err)
+	}
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                       "int16Topic",
+		SubscriptionName:            "sub-2",
+		Schema:                      NewInt16Schema(nil),
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
+	})
+	assert.Nil(t, err)
+
+	var res int16
+	msg, err := consumer.Receive(ctx)
+	assert.Nil(t, err)
+	err = msg.GetSchemaValue(&res)
+	assert.Nil(t, err)
+	assert.Equal(t, res, int16(1))
+	defer consumer.Close()
+}
+
+func TestInt32Schema(t *testing.T) {
+	client := createClient()
+	defer client.Close()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:  "int32Topic1",
+		Schema: NewInt32Schema(nil),
+	})
+	assert.Nil(t, err)
+	ctx := context.Background()
+	if _, err := producer.Send(ctx, &ProducerMessage{
+		Value: int32(1),
+	}); err != nil {
+		log.Fatal(err)
+	}
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                       "int32Topic1",
+		SubscriptionName:            "sub-2",
+		Schema:                      NewInt32Schema(nil),
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
+	})
+	assert.Nil(t, err)
+
+	var res int32
+	msg, err := consumer.Receive(ctx)
+	assert.Nil(t, err)
+	err = msg.GetSchemaValue(&res)
+	assert.Nil(t, err)
+	assert.Equal(t, res, int32(1))
+	defer consumer.Close()
+}
+
+func TestInt64Schema(t *testing.T) {
+	client := createClient()
+	defer client.Close()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:  "int64Topic",
+		Schema: NewInt64Schema(nil),
+	})
+	assert.Nil(t, err)
+	ctx := context.Background()
+	if _, err := producer.Send(ctx, &ProducerMessage{
+		Value: int64(1),
+	}); err != nil {
+		log.Fatal(err)
+	}
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                       "int64Topic",
+		SubscriptionName:            "sub-2",
+		Schema:                      NewInt64Schema(nil),
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
+	})
+	assert.Nil(t, err)
+
+	var res int64
+	msg, err := consumer.Receive(ctx)
+	assert.Nil(t, err)
+	err = msg.GetSchemaValue(&res)
+	assert.Nil(t, err)
+	assert.Equal(t, res, int64(1))
+	defer consumer.Close()
+}
+
+func TestFloatSchema(t *testing.T) {
+	client := createClient()
+	defer client.Close()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:  "floatTopic",
+		Schema: NewFloatSchema(nil),
+	})
+	assert.Nil(t, err)
+	if _, err := producer.Send(context.Background(), &ProducerMessage{
+		Value: float32(1),
+	}); err != nil {
+		log.Fatal(err)
+	}
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                       "floatTopic",
+		SubscriptionName:            "sub-2",
+		Schema:                      NewFloatSchema(nil),
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
+	})
+	assert.Nil(t, err)
+
+	var res float32
+	msg, err := consumer.Receive(context.Background())
+	assert.Nil(t, err)
+	err = msg.GetSchemaValue(&res)
+	assert.Nil(t, err)
+	assert.Equal(t, res, float32(1))
+	defer consumer.Close()
+}
+
+func TestDoubleSchema(t *testing.T) {
+	client := createClient()
+	defer client.Close()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:  "doubleTopic",
+		Schema: NewDoubleSchema(nil),
+	})
+	assert.Nil(t, err)
+	ctx := context.Background()
+	if _, err := producer.Send(ctx, &ProducerMessage{
+		Value: float64(1),
+	}); err != nil {
+		log.Fatal(err)
+	}
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                       "doubleTopic",
+		SubscriptionName:            "sub-2",
+		Schema:                      NewDoubleSchema(nil),
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
+	})
+	assert.Nil(t, err)
+
+	var res float64
+	msg, err := consumer.Receive(ctx)
+	assert.Nil(t, err)
+	err = msg.GetSchemaValue(&res)
+	assert.Nil(t, err)
+	assert.Equal(t, res, float64(1))
+	defer consumer.Close()
+}