Merge master code and fix conflict

Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
diff --git a/examples/consumer-listener/consumer-listener.go b/examples/consumer-listener/consumer-listener.go
new file mode 100644
index 0000000..c20d731
--- /dev/null
+++ b/examples/consumer-listener/consumer-listener.go
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+    "fmt"
+    "log"
+
+    "github.com/apache/pulsar-client-go/pulsar"
+)
+
+func main() {
+    client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
+    if err != nil {
+        log.Fatal(err)
+    }
+
+    defer client.Close()
+
+    channel := make(chan pulsar.ConsumerMessage, 100)
+
+    options := pulsar.ConsumerOptions{
+        Topic:            "topic-1",
+        SubscriptionName: "my-subscription",
+        Type:             pulsar.Shared,
+    }
+
+    options.MessageChannel = channel
+
+    consumer, err := client.Subscribe(options)
+    if err != nil {
+        log.Fatal(err)
+    }
+
+    defer consumer.Close()
+
+    // Receive messages from channel. The channel returns a struct which contains message and the consumer from where
+    // the message was received. It's not necessary here since we have 1 single consumer, but the channel could be
+    // shared across multiple consumers as well
+    for cm := range channel {
+        msg := cm.Message
+        fmt.Printf("Received message  msgId: %v -- content: '%s'\n",
+            msg.ID(), string(msg.Payload()))
+
+        if err := consumer.Ack(msg); err != nil {
+            log.Fatal(err)
+        }
+    }
+}
diff --git a/examples/consumer/consumer.go b/examples/consumer/consumer.go
new file mode 100644
index 0000000..3010853
--- /dev/null
+++ b/examples/consumer/consumer.go
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+    "context"
+    "fmt"
+    `github.com/apache/pulsar-client-go/pulsar`
+    `log`
+)
+
+func main() {
+    client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
+    if err != nil {
+        log.Fatal(err)
+    }
+
+    defer client.Close()
+
+    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+        Topic:            "topic-1",
+        SubscriptionName: "my-sub",
+        Type:             pulsar.Shared,
+    })
+    if err != nil {
+        log.Fatal(err)
+    }
+    defer consumer.Close()
+
+    for i := 0; i < 10; i++ {
+        msg, err := consumer.Receive(context.Background())
+        if err != nil {
+            log.Fatal(err)
+        }
+
+        fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
+            msg.ID(), string(msg.Payload()))
+
+        if err := consumer.Ack(msg); err != nil {
+            log.Fatal(err)
+        }
+    }
+
+    if err := consumer.Unsubscribe(); err != nil {
+        log.Fatal(err)
+    }
+}
diff --git a/examples/producer/producer.go b/examples/producer/producer.go
new file mode 100644
index 0000000..56b87b5
--- /dev/null
+++ b/examples/producer/producer.go
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+    `context`
+    `fmt`
+    `github.com/apache/pulsar-client-go/pulsar`
+    `log`
+)
+
+func main() {
+    client, err := pulsar.NewClient(pulsar.ClientOptions{
+        URL: "pulsar://localhost:6650",
+    })
+
+    if err != nil {
+        log.Fatal(err)
+    }
+
+    defer client.Close()
+
+    producer, err := client.CreateProducer(pulsar.ProducerOptions{
+        Topic: "topic-1",
+    })
+    if err != nil {
+        log.Fatal(err)
+    }
+
+    defer producer.Close()
+
+    ctx := context.Background()
+
+    for i := 0; i < 10; i++ {
+        if err := producer.Send(ctx, &pulsar.ProducerMessage{
+            Payload: []byte(fmt.Sprintf("hello-%d", i)),
+        }); err != nil {
+            log.Fatal(err)
+        }
+    }
+}
diff --git a/go.mod b/go.mod
index 2433c96..dddb428 100644
--- a/go.mod
+++ b/go.mod
@@ -3,10 +3,16 @@
 go 1.12
 
 require (
+	github.com/DataDog/zstd v1.4.0 // indirect
 	github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6
 	github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
+	github.com/cespare/xxhash v1.1.0 // indirect
+	github.com/deckarep/golang-set v1.7.1
 	github.com/golang/protobuf v1.3.1
+	github.com/google/go-cmp v0.3.0 // indirect
 	github.com/inconshreveable/mousetrap v1.0.0 // indirect
+	github.com/klauspost/compress v1.6.2
+	github.com/klauspost/cpuid v1.2.1 // indirect
 	github.com/pierrec/lz4 v2.0.5+incompatible
 	github.com/pkg/errors v0.8.1
 	github.com/sirupsen/logrus v1.4.1
diff --git a/go.sum b/go.sum
index 0fc836c..a1ccf2f 100644
--- a/go.sum
+++ b/go.sum
@@ -1,14 +1,28 @@
+github.com/DataDog/zstd v1.4.0 h1:vhoV+DUHnRZdKW1i5UMjAk2G4JY8wN4ayRfYDNdEhwo=
+github.com/DataDog/zstd v1.4.0/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
+github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
+github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
 github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6 h1:KXlsf+qt/X5ttPGEjR0tPH1xaWWoKBEg9Q1THAj2h3I=
 github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE=
 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
+github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
+github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/deckarep/golang-set v1.7.1 h1:SCQV0S6gTtp6itiFrTqI+pfmJ4LN85S1YzhDf9rTHJQ=
+github.com/deckarep/golang-set v1.7.1/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ=
 github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
 github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
+github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
+github.com/klauspost/compress v1.6.2 h1:D9kM6nOc1x+yA/DW/k81uG1xdmwqCMQ/A266P1edQEw=
+github.com/klauspost/compress v1.6.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
+github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w=
+github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
@@ -19,6 +33,7 @@
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
 github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
+github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
 github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
 github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
 github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8=
diff --git a/pkg/compression/lz4.go b/pkg/compression/lz4.go
index 449c205..b391336 100644
--- a/pkg/compression/lz4.go
+++ b/pkg/compression/lz4.go
@@ -21,8 +21,7 @@
 	"github.com/pierrec/lz4"
 )
 
-type lz4Provider struct {
-}
+type lz4Provider struct {}
 
 // NewLz4Provider return a interface of Provider.
 func NewLz4Provider() Provider {
diff --git a/pkg/pb/PulsarApi.pb.go b/pkg/pb/PulsarApi.pb.go
index 1406462..f47feed 100644
--- a/pkg/pb/PulsarApi.pb.go
+++ b/pkg/pb/PulsarApi.pb.go
@@ -21,10 +21,11 @@
 type CompressionType int32
 
 const (
-	CompressionType_NONE CompressionType = 0
-	CompressionType_LZ4  CompressionType = 1
-	CompressionType_ZLIB CompressionType = 2
-	CompressionType_ZSTD CompressionType = 3
+	CompressionType_NONE   CompressionType = 0
+	CompressionType_LZ4    CompressionType = 1
+	CompressionType_ZLIB   CompressionType = 2
+	CompressionType_ZSTD   CompressionType = 3
+	CompressionType_SNAPPY CompressionType = 4
 )
 
 var CompressionType_name = map[int32]string{
@@ -32,12 +33,14 @@
 	1: "LZ4",
 	2: "ZLIB",
 	3: "ZSTD",
+	4: "SNAPPY",
 }
 var CompressionType_value = map[string]int32{
-	"NONE": 0,
-	"LZ4":  1,
-	"ZLIB": 2,
-	"ZSTD": 3,
+	"NONE":   0,
+	"LZ4":    1,
+	"ZLIB":   2,
+	"ZSTD":   3,
+	"SNAPPY": 4,
 }
 
 func (x CompressionType) Enum() *CompressionType {
@@ -57,7 +60,7 @@
 	return nil
 }
 func (CompressionType) EnumDescriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{0}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{0}
 }
 
 type ServerError int32
@@ -83,6 +86,7 @@
 	ServerError_ProducerBusy                          ServerError = 16
 	ServerError_InvalidTopicName                      ServerError = 17
 	ServerError_IncompatibleSchema                    ServerError = 18
+	ServerError_ConsumerAssignError                   ServerError = 19
 )
 
 var ServerError_name = map[int32]string{
@@ -105,6 +109,7 @@
 	16: "ProducerBusy",
 	17: "InvalidTopicName",
 	18: "IncompatibleSchema",
+	19: "ConsumerAssignError",
 }
 var ServerError_value = map[string]int32{
 	"UnknownError":                          0,
@@ -126,6 +131,7 @@
 	"ProducerBusy":                          16,
 	"InvalidTopicName":                      17,
 	"IncompatibleSchema":                    18,
+	"ConsumerAssignError":                   19,
 }
 
 func (x ServerError) Enum() *ServerError {
@@ -145,7 +151,7 @@
 	return nil
 }
 func (ServerError) EnumDescriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{1}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{1}
 }
 
 type AuthMethod int32
@@ -184,7 +190,7 @@
 	return nil
 }
 func (AuthMethod) EnumDescriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{2}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{2}
 }
 
 // Each protocol version identify new features that are
@@ -208,6 +214,7 @@
 	// Added CommandActiveConsumerChange
 	// Added CommandGetTopicsOfNamespace
 	ProtocolVersion_v13 ProtocolVersion = 13
+	ProtocolVersion_v14 ProtocolVersion = 14
 )
 
 var ProtocolVersion_name = map[int32]string{
@@ -225,6 +232,7 @@
 	11: "v11",
 	12: "v12",
 	13: "v13",
+	14: "v14",
 }
 var ProtocolVersion_value = map[string]int32{
 	"v0":  0,
@@ -241,6 +249,7 @@
 	"v11": 11,
 	"v12": 12,
 	"v13": 13,
+	"v14": 14,
 }
 
 func (x ProtocolVersion) Enum() *ProtocolVersion {
@@ -260,32 +269,65 @@
 	return nil
 }
 func (ProtocolVersion) EnumDescriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{3}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{3}
 }
 
 type Schema_Type int32
 
 const (
-	Schema_None     Schema_Type = 0
-	Schema_String   Schema_Type = 1
-	Schema_Json     Schema_Type = 2
-	Schema_Protobuf Schema_Type = 3
-	Schema_Avro     Schema_Type = 4
+	Schema_None      Schema_Type = 0
+	Schema_String    Schema_Type = 1
+	Schema_Json      Schema_Type = 2
+	Schema_Protobuf  Schema_Type = 3
+	Schema_Avro      Schema_Type = 4
+	Schema_Bool      Schema_Type = 5
+	Schema_Int8      Schema_Type = 6
+	Schema_Int16     Schema_Type = 7
+	Schema_Int32     Schema_Type = 8
+	Schema_Int64     Schema_Type = 9
+	Schema_Float     Schema_Type = 10
+	Schema_Double    Schema_Type = 11
+	Schema_Date      Schema_Type = 12
+	Schema_Time      Schema_Type = 13
+	Schema_Timestamp Schema_Type = 14
+	Schema_KeyValue  Schema_Type = 15
 )
 
 var Schema_Type_name = map[int32]string{
-	0: "None",
-	1: "String",
-	2: "Json",
-	3: "Protobuf",
-	4: "Avro",
+	0:  "None",
+	1:  "String",
+	2:  "Json",
+	3:  "Protobuf",
+	4:  "Avro",
+	5:  "Bool",
+	6:  "Int8",
+	7:  "Int16",
+	8:  "Int32",
+	9:  "Int64",
+	10: "Float",
+	11: "Double",
+	12: "Date",
+	13: "Time",
+	14: "Timestamp",
+	15: "KeyValue",
 }
 var Schema_Type_value = map[string]int32{
-	"None":     0,
-	"String":   1,
-	"Json":     2,
-	"Protobuf": 3,
-	"Avro":     4,
+	"None":      0,
+	"String":    1,
+	"Json":      2,
+	"Protobuf":  3,
+	"Avro":      4,
+	"Bool":      5,
+	"Int8":      6,
+	"Int16":     7,
+	"Int32":     8,
+	"Int64":     9,
+	"Float":     10,
+	"Double":    11,
+	"Date":      12,
+	"Time":      13,
+	"Timestamp": 14,
+	"KeyValue":  15,
 }
 
 func (x Schema_Type) Enum() *Schema_Type {
@@ -305,26 +347,29 @@
 	return nil
 }
 func (Schema_Type) EnumDescriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{0, 0}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{0, 0}
 }
 
 type CommandSubscribe_SubType int32
 
 const (
-	CommandSubscribe_Exclusive CommandSubscribe_SubType = 0
-	CommandSubscribe_Shared    CommandSubscribe_SubType = 1
-	CommandSubscribe_Failover  CommandSubscribe_SubType = 2
+	CommandSubscribe_Exclusive  CommandSubscribe_SubType = 0
+	CommandSubscribe_Shared     CommandSubscribe_SubType = 1
+	CommandSubscribe_Failover   CommandSubscribe_SubType = 2
+	CommandSubscribe_Key_Shared CommandSubscribe_SubType = 3
 )
 
 var CommandSubscribe_SubType_name = map[int32]string{
 	0: "Exclusive",
 	1: "Shared",
 	2: "Failover",
+	3: "Key_Shared",
 }
 var CommandSubscribe_SubType_value = map[string]int32{
-	"Exclusive": 0,
-	"Shared":    1,
-	"Failover":  2,
+	"Exclusive":  0,
+	"Shared":     1,
+	"Failover":   2,
+	"Key_Shared": 3,
 }
 
 func (x CommandSubscribe_SubType) Enum() *CommandSubscribe_SubType {
@@ -344,7 +389,7 @@
 	return nil
 }
 func (CommandSubscribe_SubType) EnumDescriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{9, 0}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{12, 0}
 }
 
 type CommandSubscribe_InitialPosition int32
@@ -380,7 +425,7 @@
 	return nil
 }
 func (CommandSubscribe_InitialPosition) EnumDescriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{9, 1}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{12, 1}
 }
 
 type CommandPartitionedTopicMetadataResponse_LookupType int32
@@ -416,7 +461,7 @@
 	return nil
 }
 func (CommandPartitionedTopicMetadataResponse_LookupType) EnumDescriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{11, 0}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{14, 0}
 }
 
 type CommandLookupTopicResponse_LookupType int32
@@ -455,7 +500,7 @@
 	return nil
 }
 func (CommandLookupTopicResponse_LookupType) EnumDescriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{13, 0}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{16, 0}
 }
 
 type CommandAck_AckType int32
@@ -491,7 +536,7 @@
 	return nil
 }
 func (CommandAck_AckType) EnumDescriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{19, 0}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{22, 0}
 }
 
 // Acks can contain a flag to indicate the consumer
@@ -539,7 +584,7 @@
 	return nil
 }
 func (CommandAck_ValidationError) EnumDescriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{19, 1}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{22, 1}
 }
 
 type CommandGetTopicsOfNamespace_Mode int32
@@ -578,7 +623,7 @@
 	return nil
 }
 func (CommandGetTopicsOfNamespace_Mode) EnumDescriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{37, 0}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{40, 0}
 }
 
 type BaseCommand_Type int32
@@ -618,6 +663,8 @@
 	BaseCommand_GET_TOPICS_OF_NAMESPACE_RESPONSE  BaseCommand_Type = 33
 	BaseCommand_GET_SCHEMA                        BaseCommand_Type = 34
 	BaseCommand_GET_SCHEMA_RESPONSE               BaseCommand_Type = 35
+	BaseCommand_AUTH_CHALLENGE                    BaseCommand_Type = 36
+	BaseCommand_AUTH_RESPONSE                     BaseCommand_Type = 37
 )
 
 var BaseCommand_Type_name = map[int32]string{
@@ -655,6 +702,8 @@
 	33: "GET_TOPICS_OF_NAMESPACE_RESPONSE",
 	34: "GET_SCHEMA",
 	35: "GET_SCHEMA_RESPONSE",
+	36: "AUTH_CHALLENGE",
+	37: "AUTH_RESPONSE",
 }
 var BaseCommand_Type_value = map[string]int32{
 	"CONNECT":                           2,
@@ -691,6 +740,8 @@
 	"GET_TOPICS_OF_NAMESPACE_RESPONSE":  33,
 	"GET_SCHEMA":                        34,
 	"GET_SCHEMA_RESPONSE":               35,
+	"AUTH_CHALLENGE":                    36,
+	"AUTH_RESPONSE":                     37,
 }
 
 func (x BaseCommand_Type) Enum() *BaseCommand_Type {
@@ -710,7 +761,7 @@
 	return nil
 }
 func (BaseCommand_Type) EnumDescriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{41, 0}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{44, 0}
 }
 
 type Schema struct {
@@ -727,7 +778,7 @@
 func (m *Schema) String() string { return proto.CompactTextString(m) }
 func (*Schema) ProtoMessage()    {}
 func (*Schema) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{0}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{0}
 }
 func (m *Schema) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_Schema.Unmarshal(m, b)
@@ -789,7 +840,7 @@
 func (m *MessageIdData) String() string { return proto.CompactTextString(m) }
 func (*MessageIdData) ProtoMessage()    {}
 func (*MessageIdData) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{1}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{1}
 }
 func (m *MessageIdData) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_MessageIdData.Unmarshal(m, b)
@@ -852,7 +903,7 @@
 func (m *KeyValue) String() string { return proto.CompactTextString(m) }
 func (*KeyValue) ProtoMessage()    {}
 func (*KeyValue) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{2}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{2}
 }
 func (m *KeyValue) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_KeyValue.Unmarshal(m, b)
@@ -898,7 +949,7 @@
 func (m *KeyLongValue) String() string { return proto.CompactTextString(m) }
 func (*KeyLongValue) ProtoMessage()    {}
 func (*KeyLongValue) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{3}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{3}
 }
 func (m *KeyLongValue) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_KeyLongValue.Unmarshal(m, b)
@@ -945,7 +996,7 @@
 func (m *EncryptionKeys) String() string { return proto.CompactTextString(m) }
 func (*EncryptionKeys) ProtoMessage()    {}
 func (*EncryptionKeys) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{4}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{4}
 }
 func (m *EncryptionKeys) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_EncryptionKeys.Unmarshal(m, b)
@@ -1013,19 +1064,27 @@
 	// Algorithm used to encrypt data key
 	EncryptionAlgo *string `protobuf:"bytes,14,opt,name=encryption_algo,json=encryptionAlgo" json:"encryption_algo,omitempty"`
 	// Additional parameters required by encryption
-	EncryptionParam        []byte   `protobuf:"bytes,15,opt,name=encryption_param,json=encryptionParam" json:"encryption_param,omitempty"`
-	SchemaVersion          []byte   `protobuf:"bytes,16,opt,name=schema_version,json=schemaVersion" json:"schema_version,omitempty"`
-	PartitionKeyB64Encoded *bool    `protobuf:"varint,17,opt,name=partition_key_b64_encoded,json=partitionKeyB64Encoded,def=0" json:"partition_key_b64_encoded,omitempty"`
-	XXX_NoUnkeyedLiteral   struct{} `json:"-"`
-	XXX_unrecognized       []byte   `json:"-"`
-	XXX_sizecache          int32    `json:"-"`
+	EncryptionParam        []byte `protobuf:"bytes,15,opt,name=encryption_param,json=encryptionParam" json:"encryption_param,omitempty"`
+	SchemaVersion          []byte `protobuf:"bytes,16,opt,name=schema_version,json=schemaVersion" json:"schema_version,omitempty"`
+	PartitionKeyB64Encoded *bool  `protobuf:"varint,17,opt,name=partition_key_b64_encoded,json=partitionKeyB64Encoded,def=0" json:"partition_key_b64_encoded,omitempty"`
+	// Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode.
+	OrderingKey []byte `protobuf:"bytes,18,opt,name=ordering_key,json=orderingKey" json:"ordering_key,omitempty"`
+	// Mark the message to be delivered at or after the specified timestamp
+	DeliverAtTime *int64 `protobuf:"varint,19,opt,name=deliver_at_time,json=deliverAtTime" json:"deliver_at_time,omitempty"`
+	// Identify whether a message is a "marker" message used for
+	// internal metadata instead of application published data.
+	// Markers will generally not be propagated back to clients
+	MarkerType           *int32   `protobuf:"varint,20,opt,name=marker_type,json=markerType" json:"marker_type,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
 }
 
 func (m *MessageMetadata) Reset()         { *m = MessageMetadata{} }
 func (m *MessageMetadata) String() string { return proto.CompactTextString(m) }
 func (*MessageMetadata) ProtoMessage()    {}
 func (*MessageMetadata) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{5}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{5}
 }
 func (m *MessageMetadata) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_MessageMetadata.Unmarshal(m, b)
@@ -1163,6 +1222,27 @@
 	return Default_MessageMetadata_PartitionKeyB64Encoded
 }
 
+func (m *MessageMetadata) GetOrderingKey() []byte {
+	if m != nil {
+		return m.OrderingKey
+	}
+	return nil
+}
+
+func (m *MessageMetadata) GetDeliverAtTime() int64 {
+	if m != nil && m.DeliverAtTime != nil {
+		return *m.DeliverAtTime
+	}
+	return 0
+}
+
+func (m *MessageMetadata) GetMarkerType() int32 {
+	if m != nil && m.MarkerType != nil {
+		return *m.MarkerType
+	}
+	return 0
+}
+
 type SingleMessageMetadata struct {
 	Properties   []*KeyValue `protobuf:"bytes,1,rep,name=properties" json:"properties,omitempty"`
 	PartitionKey *string     `protobuf:"bytes,2,opt,name=partition_key,json=partitionKey" json:"partition_key,omitempty"`
@@ -1170,18 +1250,20 @@
 	CompactedOut *bool       `protobuf:"varint,4,opt,name=compacted_out,json=compactedOut,def=0" json:"compacted_out,omitempty"`
 	// the timestamp that this event occurs. it is typically set by applications.
 	// if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
-	EventTime              *uint64  `protobuf:"varint,5,opt,name=event_time,json=eventTime,def=0" json:"event_time,omitempty"`
-	PartitionKeyB64Encoded *bool    `protobuf:"varint,6,opt,name=partition_key_b64_encoded,json=partitionKeyB64Encoded,def=0" json:"partition_key_b64_encoded,omitempty"`
-	XXX_NoUnkeyedLiteral   struct{} `json:"-"`
-	XXX_unrecognized       []byte   `json:"-"`
-	XXX_sizecache          int32    `json:"-"`
+	EventTime              *uint64 `protobuf:"varint,5,opt,name=event_time,json=eventTime,def=0" json:"event_time,omitempty"`
+	PartitionKeyB64Encoded *bool   `protobuf:"varint,6,opt,name=partition_key_b64_encoded,json=partitionKeyB64Encoded,def=0" json:"partition_key_b64_encoded,omitempty"`
+	// Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode.
+	OrderingKey          []byte   `protobuf:"bytes,7,opt,name=ordering_key,json=orderingKey" json:"ordering_key,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
 }
 
 func (m *SingleMessageMetadata) Reset()         { *m = SingleMessageMetadata{} }
 func (m *SingleMessageMetadata) String() string { return proto.CompactTextString(m) }
 func (*SingleMessageMetadata) ProtoMessage()    {}
 func (*SingleMessageMetadata) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{6}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{6}
 }
 func (m *SingleMessageMetadata) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_SingleMessageMetadata.Unmarshal(m, b)
@@ -1247,6 +1329,13 @@
 	return Default_SingleMessageMetadata_PartitionKeyB64Encoded
 }
 
+func (m *SingleMessageMetadata) GetOrderingKey() []byte {
+	if m != nil {
+		return m.OrderingKey
+	}
+	return nil
+}
+
 type CommandConnect struct {
 	ClientVersion   *string     `protobuf:"bytes,1,req,name=client_version,json=clientVersion" json:"client_version,omitempty"`
 	AuthMethod      *AuthMethod `protobuf:"varint,2,opt,name=auth_method,json=authMethod,enum=pulsar.proto.AuthMethod" json:"auth_method,omitempty"`
@@ -1274,7 +1363,7 @@
 func (m *CommandConnect) String() string { return proto.CompactTextString(m) }
 func (*CommandConnect) ProtoMessage()    {}
 func (*CommandConnect) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{7}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{7}
 }
 func (m *CommandConnect) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandConnect.Unmarshal(m, b)
@@ -1362,6 +1451,7 @@
 type CommandConnected struct {
 	ServerVersion        *string  `protobuf:"bytes,1,req,name=server_version,json=serverVersion" json:"server_version,omitempty"`
 	ProtocolVersion      *int32   `protobuf:"varint,2,opt,name=protocol_version,json=protocolVersion,def=0" json:"protocol_version,omitempty"`
+	MaxMessageSize       *int32   `protobuf:"varint,3,opt,name=max_message_size,json=maxMessageSize" json:"max_message_size,omitempty"`
 	XXX_NoUnkeyedLiteral struct{} `json:"-"`
 	XXX_unrecognized     []byte   `json:"-"`
 	XXX_sizecache        int32    `json:"-"`
@@ -1371,7 +1461,7 @@
 func (m *CommandConnected) String() string { return proto.CompactTextString(m) }
 func (*CommandConnected) ProtoMessage()    {}
 func (*CommandConnected) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{8}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{8}
 }
 func (m *CommandConnected) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandConnected.Unmarshal(m, b)
@@ -1407,6 +1497,172 @@
 	return Default_CommandConnected_ProtocolVersion
 }
 
+func (m *CommandConnected) GetMaxMessageSize() int32 {
+	if m != nil && m.MaxMessageSize != nil {
+		return *m.MaxMessageSize
+	}
+	return 0
+}
+
+type CommandAuthResponse struct {
+	ClientVersion        *string   `protobuf:"bytes,1,opt,name=client_version,json=clientVersion" json:"client_version,omitempty"`
+	Response             *AuthData `protobuf:"bytes,2,opt,name=response" json:"response,omitempty"`
+	ProtocolVersion      *int32    `protobuf:"varint,3,opt,name=protocol_version,json=protocolVersion,def=0" json:"protocol_version,omitempty"`
+	XXX_NoUnkeyedLiteral struct{}  `json:"-"`
+	XXX_unrecognized     []byte    `json:"-"`
+	XXX_sizecache        int32     `json:"-"`
+}
+
+func (m *CommandAuthResponse) Reset()         { *m = CommandAuthResponse{} }
+func (m *CommandAuthResponse) String() string { return proto.CompactTextString(m) }
+func (*CommandAuthResponse) ProtoMessage()    {}
+func (*CommandAuthResponse) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{9}
+}
+func (m *CommandAuthResponse) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandAuthResponse.Unmarshal(m, b)
+}
+func (m *CommandAuthResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandAuthResponse.Marshal(b, m, deterministic)
+}
+func (dst *CommandAuthResponse) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandAuthResponse.Merge(dst, src)
+}
+func (m *CommandAuthResponse) XXX_Size() int {
+	return xxx_messageInfo_CommandAuthResponse.Size(m)
+}
+func (m *CommandAuthResponse) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandAuthResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandAuthResponse proto.InternalMessageInfo
+
+const Default_CommandAuthResponse_ProtocolVersion int32 = 0
+
+func (m *CommandAuthResponse) GetClientVersion() string {
+	if m != nil && m.ClientVersion != nil {
+		return *m.ClientVersion
+	}
+	return ""
+}
+
+func (m *CommandAuthResponse) GetResponse() *AuthData {
+	if m != nil {
+		return m.Response
+	}
+	return nil
+}
+
+func (m *CommandAuthResponse) GetProtocolVersion() int32 {
+	if m != nil && m.ProtocolVersion != nil {
+		return *m.ProtocolVersion
+	}
+	return Default_CommandAuthResponse_ProtocolVersion
+}
+
+type CommandAuthChallenge struct {
+	ServerVersion        *string   `protobuf:"bytes,1,opt,name=server_version,json=serverVersion" json:"server_version,omitempty"`
+	Challenge            *AuthData `protobuf:"bytes,2,opt,name=challenge" json:"challenge,omitempty"`
+	ProtocolVersion      *int32    `protobuf:"varint,3,opt,name=protocol_version,json=protocolVersion,def=0" json:"protocol_version,omitempty"`
+	XXX_NoUnkeyedLiteral struct{}  `json:"-"`
+	XXX_unrecognized     []byte    `json:"-"`
+	XXX_sizecache        int32     `json:"-"`
+}
+
+func (m *CommandAuthChallenge) Reset()         { *m = CommandAuthChallenge{} }
+func (m *CommandAuthChallenge) String() string { return proto.CompactTextString(m) }
+func (*CommandAuthChallenge) ProtoMessage()    {}
+func (*CommandAuthChallenge) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{10}
+}
+func (m *CommandAuthChallenge) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandAuthChallenge.Unmarshal(m, b)
+}
+func (m *CommandAuthChallenge) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandAuthChallenge.Marshal(b, m, deterministic)
+}
+func (dst *CommandAuthChallenge) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandAuthChallenge.Merge(dst, src)
+}
+func (m *CommandAuthChallenge) XXX_Size() int {
+	return xxx_messageInfo_CommandAuthChallenge.Size(m)
+}
+func (m *CommandAuthChallenge) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandAuthChallenge.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandAuthChallenge proto.InternalMessageInfo
+
+const Default_CommandAuthChallenge_ProtocolVersion int32 = 0
+
+func (m *CommandAuthChallenge) GetServerVersion() string {
+	if m != nil && m.ServerVersion != nil {
+		return *m.ServerVersion
+	}
+	return ""
+}
+
+func (m *CommandAuthChallenge) GetChallenge() *AuthData {
+	if m != nil {
+		return m.Challenge
+	}
+	return nil
+}
+
+func (m *CommandAuthChallenge) GetProtocolVersion() int32 {
+	if m != nil && m.ProtocolVersion != nil {
+		return *m.ProtocolVersion
+	}
+	return Default_CommandAuthChallenge_ProtocolVersion
+}
+
+// To support mutual authentication type, such as Sasl, reuse this command to mutual auth.
+type AuthData struct {
+	AuthMethodName       *string  `protobuf:"bytes,1,opt,name=auth_method_name,json=authMethodName" json:"auth_method_name,omitempty"`
+	AuthData             []byte   `protobuf:"bytes,2,opt,name=auth_data,json=authData" json:"auth_data,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *AuthData) Reset()         { *m = AuthData{} }
+func (m *AuthData) String() string { return proto.CompactTextString(m) }
+func (*AuthData) ProtoMessage()    {}
+func (*AuthData) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{11}
+}
+func (m *AuthData) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_AuthData.Unmarshal(m, b)
+}
+func (m *AuthData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_AuthData.Marshal(b, m, deterministic)
+}
+func (dst *AuthData) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_AuthData.Merge(dst, src)
+}
+func (m *AuthData) XXX_Size() int {
+	return xxx_messageInfo_AuthData.Size(m)
+}
+func (m *AuthData) XXX_DiscardUnknown() {
+	xxx_messageInfo_AuthData.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_AuthData proto.InternalMessageInfo
+
+func (m *AuthData) GetAuthMethodName() string {
+	if m != nil && m.AuthMethodName != nil {
+		return *m.AuthMethodName
+	}
+	return ""
+}
+
+func (m *AuthData) GetAuthData() []byte {
+	if m != nil {
+		return m.AuthData
+	}
+	return nil
+}
+
 type CommandSubscribe struct {
 	Topic         *string                   `protobuf:"bytes,1,req,name=topic" json:"topic,omitempty"`
 	Subscription  *string                   `protobuf:"bytes,2,req,name=subscription" json:"subscription,omitempty"`
@@ -1428,17 +1684,21 @@
 	Schema        *Schema     `protobuf:"bytes,12,opt,name=schema" json:"schema,omitempty"`
 	// Signal wthether the subscription will initialize on latest
 	// or not -- earliest
-	InitialPosition      *CommandSubscribe_InitialPosition `protobuf:"varint,13,opt,name=initialPosition,enum=pulsar.proto.CommandSubscribe_InitialPosition,def=0" json:"initialPosition,omitempty"`
-	XXX_NoUnkeyedLiteral struct{}                          `json:"-"`
-	XXX_unrecognized     []byte                            `json:"-"`
-	XXX_sizecache        int32                             `json:"-"`
+	InitialPosition *CommandSubscribe_InitialPosition `protobuf:"varint,13,opt,name=initialPosition,enum=pulsar.proto.CommandSubscribe_InitialPosition,def=0" json:"initialPosition,omitempty"`
+	// Mark the subscription as "replicated". Pulsar will make sure
+	// to periodically sync the state of replicated subscriptions
+	// across different clusters (when using geo-replication).
+	ReplicateSubscriptionState *bool    `protobuf:"varint,14,opt,name=replicate_subscription_state,json=replicateSubscriptionState" json:"replicate_subscription_state,omitempty"`
+	XXX_NoUnkeyedLiteral       struct{} `json:"-"`
+	XXX_unrecognized           []byte   `json:"-"`
+	XXX_sizecache              int32    `json:"-"`
 }
 
 func (m *CommandSubscribe) Reset()         { *m = CommandSubscribe{} }
 func (m *CommandSubscribe) String() string { return proto.CompactTextString(m) }
 func (*CommandSubscribe) ProtoMessage()    {}
 func (*CommandSubscribe) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{9}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{12}
 }
 func (m *CommandSubscribe) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandSubscribe.Unmarshal(m, b)
@@ -1552,6 +1812,13 @@
 	return Default_CommandSubscribe_InitialPosition
 }
 
+func (m *CommandSubscribe) GetReplicateSubscriptionState() bool {
+	if m != nil && m.ReplicateSubscriptionState != nil {
+		return *m.ReplicateSubscriptionState
+	}
+	return false
+}
+
 type CommandPartitionedTopicMetadata struct {
 	Topic     *string `protobuf:"bytes,1,req,name=topic" json:"topic,omitempty"`
 	RequestId *uint64 `protobuf:"varint,2,req,name=request_id,json=requestId" json:"request_id,omitempty"`
@@ -1572,7 +1839,7 @@
 func (m *CommandPartitionedTopicMetadata) String() string { return proto.CompactTextString(m) }
 func (*CommandPartitionedTopicMetadata) ProtoMessage()    {}
 func (*CommandPartitionedTopicMetadata) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{10}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{13}
 }
 func (m *CommandPartitionedTopicMetadata) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandPartitionedTopicMetadata.Unmarshal(m, b)
@@ -1644,7 +1911,7 @@
 func (m *CommandPartitionedTopicMetadataResponse) String() string { return proto.CompactTextString(m) }
 func (*CommandPartitionedTopicMetadataResponse) ProtoMessage()    {}
 func (*CommandPartitionedTopicMetadataResponse) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{11}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{14}
 }
 func (m *CommandPartitionedTopicMetadataResponse) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandPartitionedTopicMetadataResponse.Unmarshal(m, b)
@@ -1720,7 +1987,7 @@
 func (m *CommandLookupTopic) String() string { return proto.CompactTextString(m) }
 func (*CommandLookupTopic) ProtoMessage()    {}
 func (*CommandLookupTopic) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{12}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{15}
 }
 func (m *CommandLookupTopic) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandLookupTopic.Unmarshal(m, b)
@@ -1805,7 +2072,7 @@
 func (m *CommandLookupTopicResponse) String() string { return proto.CompactTextString(m) }
 func (*CommandLookupTopicResponse) ProtoMessage()    {}
 func (*CommandLookupTopicResponse) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{13}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{16}
 }
 func (m *CommandLookupTopicResponse) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandLookupTopicResponse.Unmarshal(m, b)
@@ -1906,7 +2173,7 @@
 func (m *CommandProducer) String() string { return proto.CompactTextString(m) }
 func (*CommandProducer) ProtoMessage()    {}
 func (*CommandProducer) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{14}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{17}
 }
 func (m *CommandProducer) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandProducer.Unmarshal(m, b)
@@ -1990,7 +2257,7 @@
 func (m *CommandSend) String() string { return proto.CompactTextString(m) }
 func (*CommandSend) ProtoMessage()    {}
 func (*CommandSend) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{15}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{18}
 }
 func (m *CommandSend) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandSend.Unmarshal(m, b)
@@ -2046,7 +2313,7 @@
 func (m *CommandSendReceipt) String() string { return proto.CompactTextString(m) }
 func (*CommandSendReceipt) ProtoMessage()    {}
 func (*CommandSendReceipt) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{16}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{19}
 }
 func (m *CommandSendReceipt) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandSendReceipt.Unmarshal(m, b)
@@ -2101,7 +2368,7 @@
 func (m *CommandSendError) String() string { return proto.CompactTextString(m) }
 func (*CommandSendError) ProtoMessage()    {}
 func (*CommandSendError) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{17}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{20}
 }
 func (m *CommandSendError) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandSendError.Unmarshal(m, b)
@@ -2162,7 +2429,7 @@
 func (m *CommandMessage) String() string { return proto.CompactTextString(m) }
 func (*CommandMessage) ProtoMessage()    {}
 func (*CommandMessage) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{18}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{21}
 }
 func (m *CommandMessage) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandMessage.Unmarshal(m, b)
@@ -2221,7 +2488,7 @@
 func (m *CommandAck) String() string { return proto.CompactTextString(m) }
 func (*CommandAck) ProtoMessage()    {}
 func (*CommandAck) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{19}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{22}
 }
 func (m *CommandAck) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandAck.Unmarshal(m, b)
@@ -2289,7 +2556,7 @@
 func (m *CommandActiveConsumerChange) String() string { return proto.CompactTextString(m) }
 func (*CommandActiveConsumerChange) ProtoMessage()    {}
 func (*CommandActiveConsumerChange) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{20}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{23}
 }
 func (m *CommandActiveConsumerChange) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandActiveConsumerChange.Unmarshal(m, b)
@@ -2339,7 +2606,7 @@
 func (m *CommandFlow) String() string { return proto.CompactTextString(m) }
 func (*CommandFlow) ProtoMessage()    {}
 func (*CommandFlow) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{21}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{24}
 }
 func (m *CommandFlow) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandFlow.Unmarshal(m, b)
@@ -2385,7 +2652,7 @@
 func (m *CommandUnsubscribe) String() string { return proto.CompactTextString(m) }
 func (*CommandUnsubscribe) ProtoMessage()    {}
 func (*CommandUnsubscribe) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{22}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{25}
 }
 func (m *CommandUnsubscribe) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandUnsubscribe.Unmarshal(m, b)
@@ -2424,6 +2691,7 @@
 	ConsumerId           *uint64        `protobuf:"varint,1,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
 	RequestId            *uint64        `protobuf:"varint,2,req,name=request_id,json=requestId" json:"request_id,omitempty"`
 	MessageId            *MessageIdData `protobuf:"bytes,3,opt,name=message_id,json=messageId" json:"message_id,omitempty"`
+	MessagePublishTime   *uint64        `protobuf:"varint,4,opt,name=message_publish_time,json=messagePublishTime" json:"message_publish_time,omitempty"`
 	XXX_NoUnkeyedLiteral struct{}       `json:"-"`
 	XXX_unrecognized     []byte         `json:"-"`
 	XXX_sizecache        int32          `json:"-"`
@@ -2433,7 +2701,7 @@
 func (m *CommandSeek) String() string { return proto.CompactTextString(m) }
 func (*CommandSeek) ProtoMessage()    {}
 func (*CommandSeek) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{23}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{26}
 }
 func (m *CommandSeek) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandSeek.Unmarshal(m, b)
@@ -2474,6 +2742,13 @@
 	return nil
 }
 
+func (m *CommandSeek) GetMessagePublishTime() uint64 {
+	if m != nil && m.MessagePublishTime != nil {
+		return *m.MessagePublishTime
+	}
+	return 0
+}
+
 // Message sent by broker to client when a topic
 // has been forcefully terminated and there are no more
 // messages left to consume
@@ -2488,7 +2763,7 @@
 func (m *CommandReachedEndOfTopic) String() string { return proto.CompactTextString(m) }
 func (*CommandReachedEndOfTopic) ProtoMessage()    {}
 func (*CommandReachedEndOfTopic) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{24}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{27}
 }
 func (m *CommandReachedEndOfTopic) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandReachedEndOfTopic.Unmarshal(m, b)
@@ -2527,7 +2802,7 @@
 func (m *CommandCloseProducer) String() string { return proto.CompactTextString(m) }
 func (*CommandCloseProducer) ProtoMessage()    {}
 func (*CommandCloseProducer) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{25}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{28}
 }
 func (m *CommandCloseProducer) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandCloseProducer.Unmarshal(m, b)
@@ -2573,7 +2848,7 @@
 func (m *CommandCloseConsumer) String() string { return proto.CompactTextString(m) }
 func (*CommandCloseConsumer) ProtoMessage()    {}
 func (*CommandCloseConsumer) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{26}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{29}
 }
 func (m *CommandCloseConsumer) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandCloseConsumer.Unmarshal(m, b)
@@ -2621,7 +2896,7 @@
 func (m *CommandRedeliverUnacknowledgedMessages) String() string { return proto.CompactTextString(m) }
 func (*CommandRedeliverUnacknowledgedMessages) ProtoMessage()    {}
 func (*CommandRedeliverUnacknowledgedMessages) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{27}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{30}
 }
 func (m *CommandRedeliverUnacknowledgedMessages) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandRedeliverUnacknowledgedMessages.Unmarshal(m, b)
@@ -2667,7 +2942,7 @@
 func (m *CommandSuccess) String() string { return proto.CompactTextString(m) }
 func (*CommandSuccess) ProtoMessage()    {}
 func (*CommandSuccess) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{28}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{31}
 }
 func (m *CommandSuccess) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandSuccess.Unmarshal(m, b)
@@ -2718,7 +2993,7 @@
 func (m *CommandProducerSuccess) String() string { return proto.CompactTextString(m) }
 func (*CommandProducerSuccess) ProtoMessage()    {}
 func (*CommandProducerSuccess) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{29}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{32}
 }
 func (m *CommandProducerSuccess) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandProducerSuccess.Unmarshal(m, b)
@@ -2781,7 +3056,7 @@
 func (m *CommandError) String() string { return proto.CompactTextString(m) }
 func (*CommandError) ProtoMessage()    {}
 func (*CommandError) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{30}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{33}
 }
 func (m *CommandError) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandError.Unmarshal(m, b)
@@ -2835,7 +3110,7 @@
 func (m *CommandPing) String() string { return proto.CompactTextString(m) }
 func (*CommandPing) ProtoMessage()    {}
 func (*CommandPing) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{31}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{34}
 }
 func (m *CommandPing) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandPing.Unmarshal(m, b)
@@ -2865,7 +3140,7 @@
 func (m *CommandPong) String() string { return proto.CompactTextString(m) }
 func (*CommandPong) ProtoMessage()    {}
 func (*CommandPong) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{32}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{35}
 }
 func (m *CommandPong) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandPong.Unmarshal(m, b)
@@ -2899,7 +3174,7 @@
 func (m *CommandConsumerStats) String() string { return proto.CompactTextString(m) }
 func (*CommandConsumerStats) ProtoMessage()    {}
 func (*CommandConsumerStats) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{33}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{36}
 }
 func (m *CommandConsumerStats) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandConsumerStats.Unmarshal(m, b)
@@ -2970,7 +3245,7 @@
 func (m *CommandConsumerStatsResponse) String() string { return proto.CompactTextString(m) }
 func (*CommandConsumerStatsResponse) ProtoMessage()    {}
 func (*CommandConsumerStatsResponse) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{34}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{37}
 }
 func (m *CommandConsumerStatsResponse) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandConsumerStatsResponse.Unmarshal(m, b)
@@ -3107,7 +3382,7 @@
 func (m *CommandGetLastMessageId) String() string { return proto.CompactTextString(m) }
 func (*CommandGetLastMessageId) ProtoMessage()    {}
 func (*CommandGetLastMessageId) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{35}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{38}
 }
 func (m *CommandGetLastMessageId) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandGetLastMessageId.Unmarshal(m, b)
@@ -3153,7 +3428,7 @@
 func (m *CommandGetLastMessageIdResponse) String() string { return proto.CompactTextString(m) }
 func (*CommandGetLastMessageIdResponse) ProtoMessage()    {}
 func (*CommandGetLastMessageIdResponse) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{36}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{39}
 }
 func (m *CommandGetLastMessageIdResponse) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandGetLastMessageIdResponse.Unmarshal(m, b)
@@ -3200,7 +3475,7 @@
 func (m *CommandGetTopicsOfNamespace) String() string { return proto.CompactTextString(m) }
 func (*CommandGetTopicsOfNamespace) ProtoMessage()    {}
 func (*CommandGetTopicsOfNamespace) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{37}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{40}
 }
 func (m *CommandGetTopicsOfNamespace) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandGetTopicsOfNamespace.Unmarshal(m, b)
@@ -3255,7 +3530,7 @@
 func (m *CommandGetTopicsOfNamespaceResponse) String() string { return proto.CompactTextString(m) }
 func (*CommandGetTopicsOfNamespaceResponse) ProtoMessage()    {}
 func (*CommandGetTopicsOfNamespaceResponse) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{38}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{41}
 }
 func (m *CommandGetTopicsOfNamespaceResponse) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandGetTopicsOfNamespaceResponse.Unmarshal(m, b)
@@ -3302,7 +3577,7 @@
 func (m *CommandGetSchema) String() string { return proto.CompactTextString(m) }
 func (*CommandGetSchema) ProtoMessage()    {}
 func (*CommandGetSchema) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{39}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{42}
 }
 func (m *CommandGetSchema) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandGetSchema.Unmarshal(m, b)
@@ -3358,7 +3633,7 @@
 func (m *CommandGetSchemaResponse) String() string { return proto.CompactTextString(m) }
 func (*CommandGetSchemaResponse) ProtoMessage()    {}
 func (*CommandGetSchemaResponse) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{40}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{43}
 }
 func (m *CommandGetSchemaResponse) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_CommandGetSchemaResponse.Unmarshal(m, b)
@@ -3449,6 +3724,8 @@
 	GetTopicsOfNamespaceResponse    *CommandGetTopicsOfNamespaceResponse     `protobuf:"bytes,33,opt,name=getTopicsOfNamespaceResponse" json:"getTopicsOfNamespaceResponse,omitempty"`
 	GetSchema                       *CommandGetSchema                        `protobuf:"bytes,34,opt,name=getSchema" json:"getSchema,omitempty"`
 	GetSchemaResponse               *CommandGetSchemaResponse                `protobuf:"bytes,35,opt,name=getSchemaResponse" json:"getSchemaResponse,omitempty"`
+	AuthChallenge                   *CommandAuthChallenge                    `protobuf:"bytes,36,opt,name=authChallenge" json:"authChallenge,omitempty"`
+	AuthResponse                    *CommandAuthResponse                     `protobuf:"bytes,37,opt,name=authResponse" json:"authResponse,omitempty"`
 	XXX_NoUnkeyedLiteral            struct{}                                 `json:"-"`
 	XXX_unrecognized                []byte                                   `json:"-"`
 	XXX_sizecache                   int32                                    `json:"-"`
@@ -3458,7 +3735,7 @@
 func (m *BaseCommand) String() string { return proto.CompactTextString(m) }
 func (*BaseCommand) ProtoMessage()    {}
 func (*BaseCommand) Descriptor() ([]byte, []int) {
-	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{41}
+	return fileDescriptor_PulsarApi_976de857822f53b5, []int{44}
 }
 func (m *BaseCommand) XXX_Unmarshal(b []byte) error {
 	return xxx_messageInfo_BaseCommand.Unmarshal(m, b)
@@ -3723,6 +4000,20 @@
 	return nil
 }
 
+func (m *BaseCommand) GetAuthChallenge() *CommandAuthChallenge {
+	if m != nil {
+		return m.AuthChallenge
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetAuthResponse() *CommandAuthResponse {
+	if m != nil {
+		return m.AuthResponse
+	}
+	return nil
+}
+
 func init() {
 	proto.RegisterType((*Schema)(nil), "pulsar.proto.Schema")
 	proto.RegisterType((*MessageIdData)(nil), "pulsar.proto.MessageIdData")
@@ -3733,6 +4024,9 @@
 	proto.RegisterType((*SingleMessageMetadata)(nil), "pulsar.proto.SingleMessageMetadata")
 	proto.RegisterType((*CommandConnect)(nil), "pulsar.proto.CommandConnect")
 	proto.RegisterType((*CommandConnected)(nil), "pulsar.proto.CommandConnected")
+	proto.RegisterType((*CommandAuthResponse)(nil), "pulsar.proto.CommandAuthResponse")
+	proto.RegisterType((*CommandAuthChallenge)(nil), "pulsar.proto.CommandAuthChallenge")
+	proto.RegisterType((*AuthData)(nil), "pulsar.proto.AuthData")
 	proto.RegisterType((*CommandSubscribe)(nil), "pulsar.proto.CommandSubscribe")
 	proto.RegisterType((*CommandPartitionedTopicMetadata)(nil), "pulsar.proto.CommandPartitionedTopicMetadata")
 	proto.RegisterType((*CommandPartitionedTopicMetadataResponse)(nil), "pulsar.proto.CommandPartitionedTopicMetadataResponse")
@@ -3781,263 +4075,285 @@
 	proto.RegisterEnum("pulsar.proto.BaseCommand_Type", BaseCommand_Type_name, BaseCommand_Type_value)
 }
 
-func init() { proto.RegisterFile("PulsarApi.proto", fileDescriptor_PulsarApi_9ff3bd5e091a4809) }
+func init() { proto.RegisterFile("PulsarApi.proto", fileDescriptor_PulsarApi_976de857822f53b5) }
 
-var fileDescriptor_PulsarApi_9ff3bd5e091a4809 = []byte{
-	// 4065 bytes of a gzipped FileDescriptorProto
-	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x3b, 0x4d, 0x6f, 0x1b, 0x49,
-	0x76, 0x6e, 0x7e, 0x48, 0xe4, 0xe3, 0x57, 0xb9, 0x2c, 0x6b, 0xda, 0x1f, 0x63, 0xd3, 0xed, 0xb5,
-	0x47, 0xe3, 0x1d, 0x2b, 0xb6, 0xec, 0x75, 0x66, 0xbc, 0x9b, 0x60, 0x28, 0xaa, 0x6d, 0x33, 0x96,
-	0x48, 0x6d, 0x91, 0xf2, 0x62, 0x27, 0xbb, 0xe8, 0x6d, 0x75, 0x97, 0xa9, 0x86, 0x9a, 0xdd, 0x4c,
-	0x7f, 0x68, 0xac, 0x39, 0xe4, 0x36, 0x08, 0x02, 0x04, 0x08, 0x90, 0x1c, 0x73, 0xc8, 0x21, 0x08,
-	0x72, 0xce, 0x2d, 0x40, 0x7e, 0x43, 0x6e, 0xb9, 0xe4, 0x94, 0x4b, 0x72, 0x0c, 0x90, 0x43, 0x90,
-	0x73, 0x50, 0xd5, 0xd5, 0x5f, 0x24, 0x45, 0xca, 0x3b, 0x7b, 0xc8, 0x89, 0xdd, 0xaf, 0x5e, 0xbd,
-	0x7a, 0xf5, 0xde, 0xab, 0xf7, 0x55, 0x4d, 0x68, 0x1d, 0x86, 0xb6, 0xaf, 0x7b, 0x9d, 0xa9, 0xb5,
-	0x3d, 0xf5, 0xdc, 0xc0, 0xc5, 0xf5, 0x29, 0x07, 0x44, 0x6f, 0xca, 0x7f, 0x48, 0xb0, 0x36, 0x34,
-	0x4e, 0xe8, 0x44, 0xc7, 0x18, 0x4a, 0x8e, 0x3e, 0xa1, 0xb2, 0xd4, 0x2e, 0x6c, 0x55, 0x09, 0x7f,
-	0xc6, 0x77, 0xa1, 0xe6, 0xf3, 0x51, 0xcd, 0xd4, 0x03, 0x5d, 0x2e, 0xb6, 0x0b, 0x5b, 0x75, 0x02,
-	0x11, 0x68, 0x4f, 0x0f, 0x74, 0xfc, 0x18, 0x4a, 0xc1, 0xf9, 0x94, 0xca, 0xa5, 0x76, 0x61, 0xab,
-	0xb9, 0x73, 0x63, 0x3b, 0x4b, 0x7c, 0x3b, 0x22, 0xbc, 0x3d, 0x3a, 0x9f, 0x52, 0xc2, 0xd1, 0xf0,
-	0x0b, 0x80, 0xa9, 0xe7, 0x4e, 0xa9, 0x17, 0x58, 0xd4, 0x97, 0xcb, 0xed, 0xe2, 0x56, 0x6d, 0x67,
-	0x33, 0x3f, 0xe9, 0x2d, 0x3d, 0x7f, 0xa7, 0xdb, 0x21, 0x25, 0x19, 0x4c, 0xe5, 0x0f, 0xa1, 0xc4,
-	0xa8, 0xe0, 0x0a, 0x94, 0xfa, 0xae, 0x43, 0xd1, 0x15, 0x0c, 0xb0, 0x36, 0x0c, 0x3c, 0xcb, 0x19,
-	0x23, 0x89, 0x41, 0xff, 0xc8, 0x77, 0x1d, 0x54, 0xc0, 0x75, 0xa8, 0x1c, 0x32, 0x2a, 0xc7, 0xe1,
-	0x7b, 0x54, 0x64, 0xf0, 0xce, 0x99, 0xe7, 0xa2, 0x92, 0xf2, 0x17, 0x12, 0x34, 0x0e, 0xa8, 0xef,
-	0xeb, 0x63, 0xda, 0x33, 0x39, 0xe3, 0x37, 0xa1, 0x62, 0x53, 0x73, 0x4c, 0xbd, 0x9e, 0xc9, 0x77,
-	0x5c, 0x22, 0xc9, 0x3b, 0x96, 0x61, 0x9d, 0x3a, 0x81, 0x77, 0xde, 0x33, 0xe5, 0x02, 0x1f, 0x8a,
-	0x5f, 0x71, 0x1b, 0xaa, 0x53, 0xdd, 0x0b, 0xac, 0xc0, 0x72, 0x1d, 0xb9, 0xd8, 0x96, 0xb6, 0xca,
-	0x2f, 0x0b, 0x8f, 0x9f, 0x92, 0x14, 0x88, 0xef, 0x43, 0xed, 0x58, 0x0f, 0x8c, 0x13, 0xcd, 0x72,
-	0x4c, 0xfa, 0x41, 0x2e, 0x25, 0x38, 0xc0, 0xc1, 0x3d, 0x06, 0x55, 0x76, 0xa0, 0x12, 0x6f, 0x13,
-	0x23, 0x28, 0x9e, 0xd2, 0x73, 0x21, 0x75, 0xf6, 0x88, 0x37, 0xa0, 0x7c, 0xc6, 0x86, 0xf8, 0xe2,
-	0x55, 0x12, 0xbd, 0x28, 0x2f, 0xa0, 0xfe, 0x96, 0x9e, 0xef, 0xbb, 0xce, 0xf8, 0x52, 0xf3, 0x4a,
-	0xf1, 0x3c, 0x1b, 0x9a, 0xaa, 0x63, 0x78, 0xe7, 0x53, 0xc6, 0xde, 0x5b, 0x7a, 0xee, 0xaf, 0x9a,
-	0x59, 0x17, 0x33, 0xf1, 0x0e, 0x54, 0x26, 0x34, 0xd0, 0x85, 0xe6, 0x97, 0xa9, 0x2a, 0xc1, 0x53,
-	0xfe, 0xb7, 0x0c, 0x2d, 0x21, 0xe8, 0x03, 0x01, 0xc3, 0xf7, 0xa1, 0x31, 0xf5, 0x5c, 0x33, 0x34,
-	0xa8, 0xa7, 0x65, 0x2c, 0xac, 0x1e, 0x03, 0xfb, 0xb1, 0xa5, 0xd1, 0x3f, 0x09, 0xa9, 0x63, 0x50,
-	0xcd, 0x8a, 0xe5, 0x0e, 0x31, 0xa8, 0x67, 0xe2, 0x7b, 0x50, 0x9f, 0x86, 0xc7, 0xb6, 0xe5, 0x9f,
-	0x68, 0x81, 0x35, 0xa1, 0xdc, 0x16, 0x4b, 0xa4, 0x26, 0x60, 0x23, 0x6b, 0x32, 0x6b, 0x5d, 0xa5,
-	0xcb, 0x5a, 0x17, 0xfe, 0x0c, 0x5a, 0x1e, 0x9d, 0xda, 0x96, 0xa1, 0x07, 0xd4, 0xd4, 0xde, 0x7b,
-	0xee, 0x44, 0x2e, 0xb7, 0xa5, 0xad, 0x2a, 0x69, 0xa6, 0xe0, 0x57, 0x9e, 0x3b, 0xe1, 0x3b, 0x89,
-	0x35, 0xad, 0x31, 0x19, 0xae, 0x71, 0xb4, 0x7a, 0x02, 0x7c, 0x4b, 0xcf, 0x19, 0xa3, 0xc9, 0x34,
-	0x2d, 0x70, 0xe5, 0xf5, 0x76, 0x71, 0xab, 0x4a, 0x6a, 0x09, 0x6c, 0xe4, 0x62, 0x15, 0x6a, 0x86,
-	0x3b, 0x99, 0x7a, 0xd4, 0xf7, 0x99, 0x21, 0x55, 0xda, 0xd2, 0x56, 0x73, 0xe7, 0xd3, 0x3c, 0xa7,
-	0xdd, 0x14, 0x81, 0x99, 0xfe, 0xcb, 0x52, 0x7f, 0xd0, 0x57, 0x49, 0x76, 0x1e, 0xde, 0x86, 0xab,
-	0xa1, 0x13, 0x03, 0xa8, 0xa9, 0xf9, 0xd6, 0x77, 0x54, 0xae, 0xb6, 0xa5, 0xad, 0xc6, 0x4b, 0xe9,
-	0x09, 0x41, 0xd9, 0xb1, 0xa1, 0xf5, 0x1d, 0xc5, 0xcf, 0xe1, 0xba, 0x13, 0x4e, 0xb4, 0x49, 0xa4,
-	0x1f, 0x5f, 0xb3, 0x1c, 0x8d, 0x1b, 0xa5, 0x5c, 0xe3, 0x56, 0x2a, 0x3d, 0x25, 0xd8, 0x09, 0x27,
-	0x42, 0x7d, 0x7e, 0xcf, 0xd9, 0x65, 0x83, 0xb8, 0x0d, 0x40, 0xcf, 0xa8, 0x13, 0x44, 0x62, 0xaf,
-	0xb7, 0xa5, 0xad, 0x12, 0x23, 0x5f, 0xe5, 0x40, 0x2e, 0x77, 0x15, 0x5a, 0x34, 0x31, 0x31, 0x26,
-	0x17, 0x5f, 0x6e, 0x70, 0xe1, 0xdf, 0xce, 0x6f, 0x29, 0x6f, 0x87, 0xa4, 0x49, 0xf3, 0x76, 0xf9,
-	0x59, 0x8e, 0x8c, 0x6e, 0x8f, 0x5d, 0xb9, 0x19, 0xa9, 0x21, 0x05, 0x77, 0xec, 0xb1, 0x8b, 0x3f,
-	0x07, 0x94, 0x41, 0x9c, 0xea, 0x9e, 0x3e, 0x91, 0x5b, 0x6d, 0x69, 0xab, 0x4e, 0x32, 0x04, 0x0e,
-	0x19, 0x18, 0x3f, 0x80, 0xa6, 0x70, 0x60, 0x67, 0xd4, 0xe3, 0xc2, 0x46, 0x1c, 0xb1, 0x11, 0x41,
-	0xdf, 0x45, 0x40, 0xfc, 0x35, 0xdc, 0xc8, 0x29, 0x56, 0x3b, 0x7e, 0xf1, 0x5c, 0xa3, 0x8e, 0xe1,
-	0x9a, 0xd4, 0x94, 0xaf, 0xb6, 0xa5, 0xad, 0xca, 0xcb, 0xf2, 0x7b, 0xdd, 0xf6, 0x29, 0xd9, 0xcc,
-	0xea, 0x7a, 0xf7, 0xc5, 0x73, 0x35, 0x42, 0x52, 0xfe, 0xa1, 0x00, 0xd7, 0x87, 0x96, 0x33, 0xb6,
-	0xe9, 0xac, 0xf9, 0xe7, 0xad, 0x52, 0xba, 0xb4, 0x55, 0xce, 0x19, 0x5b, 0x61, 0xb1, 0xb1, 0x4d,
-	0xf5, 0x73, 0xdb, 0xd5, 0x85, 0xf6, 0xd9, 0xa9, 0x28, 0x93, 0x9a, 0x80, 0x71, 0xad, 0x3f, 0x82,
-	0x06, 0xb3, 0x03, 0xdd, 0x60, 0xc6, 0xed, 0x86, 0x01, 0xf7, 0x49, 0xc9, 0x7e, 0xea, 0xc9, 0xd8,
-	0x20, 0x0c, 0x66, 0x74, 0x5d, 0x5e, 0xa0, 0xeb, 0xa5, 0x92, 0x5a, 0xbb, 0x8c, 0xa4, 0xfe, 0xbe,
-	0x08, 0xcd, 0xae, 0x3b, 0x99, 0xe8, 0x8e, 0xd9, 0x75, 0x1d, 0x87, 0x1a, 0x01, 0xd3, 0x92, 0x61,
-	0x5b, 0x6c, 0xdd, 0x58, 0x4b, 0x91, 0x8b, 0x68, 0x44, 0xd0, 0x58, 0x4b, 0x5f, 0x41, 0x4d, 0x0f,
-	0x83, 0x13, 0x6d, 0x42, 0x83, 0x13, 0xd7, 0xe4, 0xf2, 0x68, 0xee, 0xc8, 0x79, 0x51, 0x76, 0xc2,
-	0xe0, 0xe4, 0x80, 0x8f, 0x13, 0xd0, 0x93, 0x67, 0xbc, 0x05, 0x28, 0x33, 0x35, 0x72, 0x43, 0xe2,
-	0x8c, 0xa7, 0x58, 0xdc, 0x11, 0xdd, 0x82, 0x2a, 0xc7, 0x14, 0x6e, 0x8f, 0x19, 0x4b, 0x85, 0x01,
-	0x78, 0xd4, 0xf8, 0x02, 0x10, 0x5f, 0xc6, 0x70, 0xed, 0x84, 0xd5, 0xc8, 0xc5, 0x4b, 0x4f, 0x48,
-	0x2b, 0x1e, 0x8a, 0xf9, 0x7d, 0x0c, 0xd7, 0xa6, 0x9e, 0xfb, 0xe1, 0x5c, 0x0b, 0x5c, 0xed, 0xd8,
-	0x73, 0x4f, 0xa9, 0xa7, 0x85, 0x9e, 0x2d, 0x9c, 0x06, 0xe2, 0x43, 0x23, 0x77, 0x97, 0x0f, 0x1c,
-	0x79, 0x36, 0x7e, 0x0c, 0xd8, 0xf5, 0xac, 0xb1, 0xe5, 0xe8, 0xb6, 0x36, 0xf5, 0x2c, 0xc7, 0xb0,
-	0xa6, 0xba, 0x2d, 0xaf, 0x73, 0xec, 0xab, 0xf1, 0xc8, 0x61, 0x3c, 0x80, 0xbf, 0xc8, 0xa0, 0xa7,
-	0x1c, 0x57, 0x22, 0xe2, 0xf1, 0x48, 0x27, 0xe6, 0xfc, 0x09, 0x6c, 0xe4, 0xb1, 0x85, 0x10, 0xab,
-	0x1c, 0x1f, 0x67, 0xf1, 0x23, 0x61, 0x28, 0x63, 0x40, 0x79, 0x35, 0x51, 0x93, 0x1f, 0x27, 0xea,
-	0x9d, 0x51, 0x6f, 0x56, 0x51, 0x11, 0x34, 0xde, 0xf8, 0x22, 0x31, 0x15, 0x2e, 0x12, 0x93, 0xf2,
-	0x2f, 0xe5, 0x64, 0xa5, 0x61, 0x78, 0xec, 0x1b, 0x9e, 0x75, 0x4c, 0x59, 0x48, 0x0a, 0xdc, 0xa9,
-	0x65, 0x88, 0x05, 0xa2, 0x17, 0xac, 0x40, 0xdd, 0x8f, 0x50, 0xf8, 0x19, 0x17, 0x11, 0x32, 0x07,
-	0xc3, 0x5f, 0xc3, 0xba, 0x1f, 0x1e, 0x33, 0x9f, 0xc9, 0x4f, 0x43, 0x73, 0xe7, 0xe1, 0x9c, 0x63,
-	0xcd, 0x2d, 0xb5, 0x3d, 0x8c, 0xb0, 0x49, 0x3c, 0x8d, 0xc5, 0x22, 0xc3, 0x75, 0xfc, 0x70, 0x42,
-	0x3d, 0x16, 0x8b, 0x4a, 0x51, 0x2c, 0x8a, 0x41, 0x3d, 0x13, 0x7f, 0x0a, 0xe0, 0xb1, 0xc8, 0xe4,
-	0x07, 0x6c, 0xbc, 0xcc, 0xc7, 0xab, 0x02, 0xd2, 0x33, 0xd9, 0xc9, 0x4d, 0xe6, 0x73, 0x4b, 0x13,
-	0x61, 0x22, 0x06, 0x72, 0x3b, 0x7b, 0x00, 0xcd, 0xa9, 0x67, 0xb9, 0x9e, 0x15, 0x9c, 0x6b, 0x36,
-	0x3d, 0xa3, 0x91, 0xa6, 0xcb, 0xa4, 0x11, 0x43, 0xf7, 0x19, 0x10, 0xdf, 0x81, 0x75, 0x33, 0xf4,
-	0xf4, 0x63, 0x9b, 0x72, 0xd5, 0x56, 0x5e, 0x96, 0x02, 0x2f, 0xa4, 0x24, 0x06, 0x62, 0x15, 0x90,
-	0x1f, 0xe8, 0x5e, 0x10, 0x7b, 0x75, 0xc6, 0x10, 0xd3, 0x69, 0x6d, 0xe7, 0x56, 0x7e, 0xdb, 0xb9,
-	0xf4, 0x87, 0x34, 0xf9, 0xa4, 0x04, 0x96, 0x8b, 0xf5, 0x70, 0xb9, 0x58, 0xcf, 0x76, 0xe0, 0x51,
-	0xdd, 0xd4, 0x12, 0x0f, 0xc2, 0xe3, 0x48, 0x85, 0x34, 0x18, 0xb4, 0x1b, 0x03, 0xf1, 0x17, 0xb0,
-	0x16, 0x39, 0x5b, 0x1e, 0x3b, 0x6a, 0x3b, 0x1b, 0x8b, 0x92, 0x44, 0x22, 0x70, 0xf0, 0x6f, 0xa0,
-	0x65, 0x39, 0x56, 0x60, 0xe9, 0xf6, 0xa1, 0xeb, 0x47, 0x79, 0x56, 0x83, 0x9f, 0xf3, 0xed, 0x15,
-	0x5a, 0xec, 0xe5, 0x67, 0xbd, 0x5c, 0xdb, 0xd7, 0x03, 0xea, 0x07, 0x64, 0x96, 0x9c, 0xb2, 0x03,
-	0xeb, 0x42, 0xe3, 0xb8, 0x01, 0x55, 0xf5, 0x83, 0x61, 0x87, 0xbe, 0x75, 0x16, 0xe7, 0x94, 0x27,
-	0xba, 0x47, 0x4d, 0x24, 0xb1, 0x4c, 0xf2, 0x95, 0x6e, 0xd9, 0xee, 0x19, 0xf5, 0x50, 0x41, 0xf9,
-	0x31, 0xb4, 0x66, 0xe8, 0x33, 0xe4, 0x68, 0x05, 0x74, 0x85, 0x21, 0xab, 0xba, 0x67, 0x5b, 0xec,
-	0x4d, 0x52, 0xfe, 0x53, 0x82, 0xbb, 0x82, 0xbd, 0xc3, 0xd8, 0x05, 0x52, 0x73, 0xc4, 0x0c, 0x38,
-	0x09, 0x0a, 0x8b, 0xcd, 0x3b, 0x6f, 0x57, 0x85, 0x59, 0xbb, 0x5a, 0xec, 0x20, 0x8a, 0x1f, 0xe7,
-	0x20, 0x4a, 0x1f, 0xe9, 0x20, 0xca, 0x17, 0x3a, 0x88, 0x7f, 0x2a, 0xc0, 0x67, 0x2b, 0xf6, 0x49,
-	0xa8, 0x3f, 0x75, 0x1d, 0x9f, 0xe2, 0x3b, 0x00, 0x49, 0x38, 0x60, 0x41, 0x50, 0xda, 0x6a, 0x90,
-	0x0c, 0x64, 0xd5, 0xce, 0x7f, 0x05, 0x15, 0x4f, 0x90, 0xe2, 0xfb, 0x6d, 0xee, 0x7c, 0xbd, 0xd0,
-	0x1c, 0x56, 0xf1, 0xb1, 0xbd, 0xef, 0xba, 0xa7, 0xe1, 0x94, 0x1f, 0xf7, 0x84, 0x22, 0xfe, 0x3d,
-	0x28, 0x53, 0xcf, 0x73, 0x3d, 0x2e, 0x9b, 0xf9, 0x2a, 0x86, 0xbb, 0x36, 0x95, 0x21, 0x90, 0x08,
-	0x8f, 0x15, 0x08, 0xe2, 0xb8, 0x09, 0xf1, 0xc4, 0xaf, 0xca, 0x03, 0x80, 0x74, 0x09, 0x5c, 0x63,
-	0xa6, 0x66, 0x18, 0xd4, 0xf7, 0x23, 0xeb, 0x62, 0x16, 0xc5, 0xac, 0x4b, 0xf9, 0xbe, 0x00, 0x58,
-	0xb0, 0x2c, 0xd0, 0xb9, 0xfe, 0x7f, 0x2b, 0xab, 0xf8, 0x31, 0x34, 0x98, 0xbe, 0x98, 0xcf, 0xd0,
-	0x03, 0xeb, 0x2c, 0x12, 0x50, 0x12, 0x85, 0xf3, 0x63, 0x17, 0x98, 0x50, 0xe9, 0xe3, 0x4c, 0xa8,
-	0xfc, 0x91, 0x26, 0xb4, 0x76, 0xa1, 0x09, 0xfd, 0x5b, 0x11, 0x6e, 0xce, 0xcb, 0x21, 0xb1, 0x9a,
-	0x47, 0x80, 0xa2, 0xb8, 0xc9, 0x74, 0x60, 0x19, 0xf4, 0xc8, 0xb3, 0xb9, 0xed, 0x54, 0xc9, 0x1c,
-	0x1c, 0x3f, 0x81, 0x6b, 0xb3, 0xb0, 0x91, 0xed, 0x8b, 0xa4, 0x69, 0xd1, 0x10, 0x1e, 0xcc, 0x19,
-	0xd5, 0xb3, 0x85, 0x46, 0xb5, 0x80, 0xb3, 0xc5, 0x76, 0x94, 0x57, 0x54, 0x69, 0xa5, 0xa2, 0xca,
-	0x4b, 0x14, 0x95, 0xd8, 0xe4, 0xda, 0xc7, 0xdb, 0xe4, 0x7a, 0xce, 0x26, 0x79, 0xca, 0x16, 0xa5,
-	0x21, 0x27, 0x9e, 0x1b, 0x8e, 0x4f, 0x34, 0x3f, 0x12, 0x03, 0x4f, 0x46, 0x2a, 0xf9, 0x94, 0x8d,
-	0xe7, 0x24, 0x11, 0x5a, 0x2a, 0x2c, 0xe5, 0x59, 0xce, 0xaa, 0xeb, 0x50, 0x21, 0xd4, 0xb4, 0x3c,
-	0x6a, 0x30, 0xdf, 0x57, 0x83, 0x75, 0x91, 0x1f, 0x20, 0x29, 0x63, 0xe3, 0x05, 0xe5, 0xaf, 0x0b,
-	0xd0, 0x8a, 0x8f, 0xa5, 0xa8, 0xf4, 0x2e, 0x30, 0xf0, 0xbb, 0x50, 0x4b, 0x0a, 0xc4, 0xb4, 0xf6,
-	0x8b, 0x41, 0x73, 0xf1, 0xb6, 0xb8, 0x20, 0xde, 0xe6, 0x0b, 0xcc, 0x92, 0xc8, 0x94, 0xb3, 0x05,
-	0xe6, 0x7d, 0xa8, 0x8a, 0xe2, 0x80, 0x9a, 0x79, 0xc9, 0xa7, 0xf0, 0x5c, 0x18, 0x5c, 0xbb, 0x64,
-	0x18, 0x4c, 0xe3, 0xdb, 0xfa, 0xea, 0xf8, 0xa6, 0x84, 0x50, 0x8b, 0x43, 0x17, 0x75, 0xcc, 0xd9,
-	0xad, 0x4b, 0x73, 0x5b, 0x5f, 0x59, 0x17, 0xff, 0x08, 0xea, 0xd9, 0xa2, 0x4e, 0x74, 0x25, 0xa4,
-	0xa7, 0xa4, 0x96, 0xa9, 0xe5, 0x94, 0xbf, 0x92, 0x12, 0x87, 0xc3, 0xd6, 0x25, 0xd4, 0xa0, 0xd6,
-	0x34, 0xf8, 0x1d, 0x2c, 0xff, 0x12, 0x20, 0x93, 0x79, 0x14, 0x57, 0x67, 0x1e, 0xd5, 0x49, 0xfc,
-	0xaa, 0xfc, 0xad, 0x94, 0x26, 0x7e, 0xd4, 0x31, 0xb9, 0x39, 0xff, 0x0e, 0x58, 0x4a, 0x8e, 0x4e,
-	0x71, 0x61, 0x53, 0x6a, 0xe9, 0xd1, 0x29, 0x71, 0xbb, 0x4c, 0xdc, 0xf9, 0xdf, 0x48, 0x49, 0xad,
-	0x22, 0x76, 0x31, 0x9b, 0x1c, 0x4a, 0x73, 0xc9, 0x61, 0x5e, 0x22, 0x8c, 0xbd, 0x4b, 0x4b, 0x84,
-	0x25, 0xce, 0x1e, 0x35, 0xa9, 0x6d, 0x9d, 0x51, 0xef, 0x5c, 0x33, 0xdc, 0xd0, 0x09, 0xb8, 0x4c,
-	0x79, 0x41, 0xdf, 0x4a, 0x87, 0xba, 0x6c, 0x44, 0xf9, 0x9f, 0x22, 0x80, 0xe0, 0xae, 0x63, 0x9c,
-	0xae, 0xe6, 0xec, 0xa7, 0x50, 0xd1, 0x8d, 0x53, 0x8d, 0x37, 0xec, 0x0a, 0x5c, 0x36, 0xed, 0x85,
-	0x0e, 0xaf, 0x63, 0x9c, 0x6e, 0x77, 0x8c, 0xd3, 0x28, 0x29, 0xd6, 0xa3, 0x87, 0x39, 0x45, 0x17,
-	0x3f, 0x62, 0x5b, 0x43, 0x40, 0x67, 0xba, 0x6d, 0x99, 0x3a, 0xaf, 0x1a, 0xb3, 0xb1, 0x76, 0xeb,
-	0x42, 0x06, 0xde, 0x25, 0x13, 0x22, 0x5d, 0xb5, 0xce, 0xf2, 0x00, 0xc6, 0xd0, 0x5c, 0x2f, 0xf1,
-	0xe6, 0xdc, 0x69, 0x4d, 0x1a, 0x66, 0xb9, 0x7e, 0xe2, 0xe7, 0xb0, 0x2e, 0x36, 0x88, 0x9b, 0x00,
-	0x3d, 0xc7, 0xb4, 0xce, 0x2c, 0x33, 0xd4, 0x6d, 0x74, 0x85, 0xbd, 0x77, 0xc3, 0x49, 0x68, 0x73,
-	0x37, 0x8c, 0x24, 0xe5, 0x2f, 0x25, 0x68, 0xcd, 0xf0, 0x82, 0xef, 0xc0, 0xcd, 0xa3, 0x99, 0xe6,
-	0x4a, 0xd7, 0xf5, 0xbc, 0x90, 0x17, 0x20, 0xe8, 0x0a, 0xde, 0x04, 0xbc, 0x47, 0x33, 0x9d, 0x1a,
-	0x3e, 0x0b, 0x49, 0x78, 0x03, 0x50, 0xf7, 0x84, 0x1a, 0xa7, 0x7e, 0x38, 0x39, 0xb0, 0xfc, 0x89,
-	0x1e, 0x18, 0x27, 0xa8, 0x80, 0x6f, 0xc0, 0x75, 0xde, 0x69, 0xd9, 0xa3, 0x43, 0xea, 0x59, 0xba,
-	0x6d, 0x7d, 0x47, 0xa3, 0x09, 0x45, 0x7c, 0x0d, 0x5a, 0x7b, 0x34, 0xee, 0x68, 0x44, 0xc0, 0x92,
-	0x72, 0x0c, 0xb7, 0x12, 0x39, 0x31, 0x26, 0xbb, 0x42, 0xc3, 0xdd, 0x13, 0xdd, 0xb9, 0x8c, 0x81,
-	0x2a, 0x50, 0xb5, 0x7c, 0x4d, 0xe7, 0x73, 0x79, 0x7c, 0x4c, 0x3c, 0x61, 0xc5, 0xf2, 0x23, 0x92,
-	0xca, 0xbb, 0xc4, 0x4d, 0xbd, 0xb2, 0xdd, 0x6f, 0x57, 0xd3, 0x7c, 0x08, 0x4d, 0xa1, 0xee, 0x43,
-	0xea, 0x4d, 0xac, 0xc0, 0xe7, 0x06, 0xd6, 0x20, 0x33, 0x50, 0x65, 0x94, 0xb8, 0xa1, 0x23, 0xc7,
-	0x4f, 0x8a, 0xbd, 0x95, 0xe4, 0x97, 0xa7, 0x40, 0xca, 0x9f, 0x4b, 0x19, 0xaf, 0x4a, 0x4f, 0x7f,
-	0x28, 0xbd, 0x1f, 0xe4, 0xd4, 0x7e, 0x0a, 0xb2, 0x60, 0x85, 0x50, 0xdd, 0x38, 0xa1, 0xa6, 0xea,
-	0x98, 0x83, 0xf7, 0xa3, 0x38, 0xd0, 0x2d, 0xe5, 0x4b, 0x79, 0x07, 0x1b, 0x71, 0xcd, 0x6d, 0xbb,
-	0x3e, 0x4d, 0xe2, 0xe6, 0x4a, 0xa7, 0xb8, 0x42, 0x40, 0x33, 0x74, 0x63, 0x8b, 0xf9, 0xc1, 0x82,
-	0xff, 0x33, 0x09, 0x1e, 0x26, 0xbb, 0x15, 0xce, 0xe9, 0xc8, 0xd1, 0x8d, 0x53, 0xc7, 0xfd, 0x96,
-	0xb7, 0xd3, 0x63, 0xb7, 0xe9, 0xaf, 0x5e, 0xea, 0x67, 0x50, 0x4b, 0x85, 0xce, 0xec, 0x67, 0xa5,
-	0x87, 0x81, 0x44, 0xea, 0xbe, 0xf2, 0xeb, 0xc4, 0x51, 0x8b, 0x8c, 0x7b, 0x86, 0x75, 0x69, 0x56,
-	0xc7, 0x69, 0xd8, 0x2e, 0x5c, 0x22, 0x6c, 0xff, 0xa3, 0x04, 0x9b, 0x33, 0xc9, 0xcc, 0x25, 0xd7,
-	0x99, 0x4b, 0x4e, 0x0a, 0x0b, 0xba, 0xdf, 0x5f, 0x00, 0xb2, 0x75, 0x3f, 0xd0, 0xb2, 0x81, 0x8d,
-	0x99, 0x5d, 0x91, 0x5f, 0x1d, 0x34, 0xd9, 0xd8, 0x30, 0x0d, 0x70, 0xf3, 0x4d, 0xcd, 0xd2, 0x82,
-	0xa6, 0xa6, 0xf2, 0x01, 0xea, 0x82, 0xe5, 0xc8, 0x6b, 0xad, 0x60, 0x34, 0x09, 0x9b, 0x85, 0x8f,
-	0x0f, 0x9b, 0xc5, 0x7c, 0xd8, 0x6c, 0x24, 0xc7, 0xf1, 0xd0, 0x72, 0xc6, 0xd9, 0x57, 0xd7, 0x19,
-	0x67, 0x8d, 0x51, 0x68, 0x7f, 0x18, 0xe8, 0xc1, 0x4a, 0x41, 0xae, 0xea, 0xca, 0x28, 0xff, 0x5d,
-	0x82, 0xdb, 0x8b, 0x08, 0x93, 0xc5, 0xf9, 0xf9, 0xdc, 0x02, 0x5f, 0x02, 0xf0, 0x8d, 0x69, 0x86,
-	0x6b, 0x52, 0xd1, 0x5d, 0x5c, 0x22, 0x85, 0x2a, 0x47, 0xee, 0xba, 0x26, 0xcb, 0x2d, 0x1b, 0xd1,
-	0xcc, 0x54, 0x1e, 0x3c, 0x01, 0xe5, 0xc0, 0x38, 0x71, 0xb8, 0x03, 0x30, 0xf1, 0xc7, 0x44, 0x0f,
-	0xe8, 0x40, 0x34, 0x61, 0x25, 0x92, 0x81, 0xb0, 0x62, 0x67, 0xe2, 0x8f, 0x45, 0xf2, 0x3d, 0x0d,
-	0x03, 0x86, 0x55, 0xe6, 0x58, 0x73, 0x70, 0x81, 0xcb, 0x66, 0x26, 0xc7, 0x8e, 0x17, 0x0a, 0x11,
-	0x6e, 0x0e, 0x8e, 0x15, 0xc8, 0x35, 0x9e, 0x44, 0x75, 0x90, 0x6f, 0x46, 0x3d, 0x02, 0xa4, 0x9f,
-	0xe9, 0x96, 0xad, 0x1f, 0xdb, 0x89, 0x03, 0x67, 0x95, 0x41, 0x89, 0xcc, 0xc1, 0xf1, 0x16, 0xb4,
-	0x42, 0x76, 0xc4, 0xd3, 0xb3, 0xcd, 0x1b, 0x4e, 0x25, 0x32, 0x0b, 0xc6, 0xbb, 0x70, 0xfb, 0xd8,
-	0x76, 0x19, 0x28, 0xd6, 0xc7, 0xc0, 0x39, 0x12, 0x38, 0xfe, 0xd8, 0x97, 0x81, 0xb7, 0x8b, 0x96,
-	0xe2, 0x30, 0x23, 0xd3, 0x4d, 0x93, 0xc5, 0x51, 0xde, 0x5d, 0xaa, 0x92, 0xf8, 0x95, 0x85, 0x1c,
-	0x23, 0x6e, 0x4c, 0x0e, 0x2d, 0xc7, 0x88, 0xee, 0x26, 0xaa, 0x64, 0x06, 0x8a, 0xb1, 0xb8, 0xa2,
-	0x6c, 0xf0, 0xd1, 0xe8, 0x1e, 0x92, 0x85, 0xab, 0x48, 0x4e, 0xea, 0x87, 0xa9, 0xe5, 0x51, 0x93,
-	0xdf, 0x34, 0x48, 0x64, 0x06, 0x2a, 0x74, 0xb6, 0xab, 0x1b, 0xa7, 0xb6, 0x3b, 0xe6, 0x77, 0x0c,
-	0x25, 0x92, 0x81, 0x28, 0xbf, 0x84, 0x4f, 0x84, 0xc5, 0xbd, 0xa6, 0xc1, 0xbe, 0xee, 0x67, 0x3a,
-	0x6a, 0x3f, 0xd4, 0xb5, 0x7e, 0x9f, 0x76, 0x91, 0x66, 0x69, 0x27, 0x06, 0xdd, 0x85, 0x16, 0x77,
-	0x1b, 0x99, 0x60, 0x25, 0xad, 0xce, 0x37, 0x1b, 0x76, 0x8e, 0xd1, 0x15, 0x7c, 0xfc, 0xbb, 0x94,
-	0xa4, 0x1b, 0xaf, 0x69, 0xc0, 0xe3, 0x98, 0x3f, 0x78, 0xcf, 0xac, 0xc6, 0x9f, 0xea, 0xc6, 0xca,
-	0x43, 0x75, 0x1b, 0xaa, 0x4e, 0x8c, 0x2b, 0x5c, 0x5f, 0x0a, 0xc0, 0x7d, 0x28, 0x4d, 0xd8, 0x61,
-	0x2b, 0x2e, 0x69, 0xf1, 0x2d, 0x5a, 0x75, 0xfb, 0xc0, 0x35, 0xe9, 0x4b, 0x38, 0x54, 0xc9, 0xb0,
-	0x37, 0x1c, 0xa9, 0xfd, 0x11, 0xe1, 0x74, 0x94, 0x67, 0x50, 0x62, 0x23, 0x2c, 0x89, 0x4b, 0xc7,
-	0xd0, 0x15, 0x8c, 0xa1, 0xd9, 0x1f, 0xf4, 0xb5, 0x0c, 0x4c, 0xc2, 0xeb, 0x50, 0xec, 0xec, 0xef,
-	0xa3, 0x82, 0xf2, 0x2b, 0xb8, 0xbf, 0x64, 0xa9, 0xcb, 0x7a, 0x8f, 0x4d, 0x58, 0xe3, 0xd5, 0x6c,
-	0x14, 0xb9, 0xaa, 0x44, 0xbc, 0x29, 0x4e, 0x52, 0xe3, 0xbc, 0xa6, 0x81, 0xb8, 0x6a, 0x5f, 0x41,
-	0x2a, 0xa9, 0x92, 0x0b, 0xd9, 0x2a, 0x79, 0xde, 0xeb, 0x17, 0x17, 0x79, 0xfd, 0xff, 0x92, 0x92,
-	0x04, 0x24, 0x59, 0xf0, 0xff, 0x89, 0x07, 0x4c, 0x43, 0x6e, 0xe9, 0x12, 0x9d, 0xe0, 0xf9, 0xfd,
-	0x96, 0x17, 0xed, 0xf7, 0x9f, 0x6f, 0x40, 0x6d, 0x57, 0x67, 0x39, 0x0d, 0xdf, 0x33, 0xde, 0x11,
-	0xc7, 0x5d, 0xe2, 0x51, 0xec, 0x4e, 0x7e, 0x89, 0x0c, 0x62, 0xfe, 0xb3, 0x84, 0x75, 0xe1, 0x34,
-	0x44, 0x32, 0x70, 0x7b, 0xa1, 0x25, 0x8a, 0x3e, 0x07, 0x89, 0x91, 0xf1, 0xcf, 0xa0, 0x9a, 0x38,
-	0x1b, 0x91, 0x26, 0xde, 0x59, 0x36, 0x93, 0x9a, 0x24, 0x9d, 0xc0, 0x66, 0x27, 0x29, 0xb0, 0x90,
-	0xc8, 0x9d, 0xe5, 0x4d, 0x6e, 0x92, 0x4e, 0xc0, 0x5f, 0x41, 0x25, 0x4e, 0x21, 0xb8, 0x60, 0x6a,
-	0x0b, 0x2e, 0x90, 0xb3, 0xe9, 0x0a, 0x49, 0xd0, 0xf1, 0x63, 0x28, 0xf9, 0xd4, 0x89, 0xfa, 0x72,
-	0xb5, 0x59, 0x05, 0x67, 0xbb, 0x04, 0x1c, 0x0d, 0x77, 0xa1, 0xce, 0x7e, 0x35, 0x2f, 0x6a, 0x1a,
-	0x88, 0x36, 0x47, 0xfb, 0xe2, 0x69, 0x11, 0x1e, 0xa9, 0xf9, 0x99, 0x4e, 0xc3, 0x1f, 0x00, 0x70,
-	0x22, 0x51, 0x8a, 0x51, 0x59, 0xb6, 0xdb, 0xb8, 0x15, 0x40, 0xaa, 0x7e, 0xd2, 0x15, 0x78, 0x91,
-	0xe6, 0x1a, 0xd5, 0x25, 0x1a, 0x12, 0x96, 0x96, 0xf6, 0xbe, 0x1e, 0x41, 0x51, 0x37, 0x4e, 0x79,
-	0xa4, 0xa9, 0xcd, 0x5e, 0x15, 0xa6, 0xc5, 0x26, 0x61, 0x48, 0x4c, 0x2c, 0xef, 0x6d, 0xf7, 0x5b,
-	0x1e, 0x67, 0x2e, 0x12, 0x0b, 0xab, 0x86, 0x08, 0x47, 0xc3, 0xbb, 0x50, 0x0b, 0xd3, 0x1a, 0x46,
-	0x5c, 0x6e, 0x2c, 0x96, 0x4a, 0xa6, 0xd6, 0x21, 0xd9, 0x49, 0x6c, 0x5b, 0x7e, 0x94, 0x46, 0xf2,
-	0xf0, 0x74, 0xd1, 0xb6, 0x44, 0xaa, 0x49, 0x62, 0x64, 0xfc, 0x24, 0xce, 0xd5, 0x9a, 0x7c, 0xd6,
-	0xcd, 0x85, 0xb3, 0x72, 0xc9, 0x5a, 0x0f, 0x9a, 0x06, 0x4b, 0xfd, 0xb5, 0xc4, 0x68, 0x5a, 0x7c,
-	0xaa, 0xb2, 0xd8, 0x5e, 0xb3, 0xd5, 0x07, 0x69, 0x18, 0xb9, 0x62, 0x24, 0x21, 0x15, 0x07, 0x33,
-	0x7e, 0xa7, 0xbe, 0x94, 0x54, 0x1c, 0xdb, 0x05, 0xa9, 0xa4, 0xfe, 0x18, 0xf0, 0x8b, 0xc2, 0x28,
-	0x39, 0x8e, 0x05, 0x71, 0x95, 0x13, 0xfb, 0xd1, 0x52, 0x63, 0x8e, 0x05, 0xd2, 0x9a, 0xce, 0x24,
-	0xe3, 0x8f, 0xa1, 0x34, 0xb5, 0x9c, 0xb1, 0x8c, 0x97, 0xe8, 0x90, 0xe5, 0xa4, 0x84, 0xa3, 0x71,
-	0x74, 0xd7, 0x19, 0xcb, 0xd7, 0x96, 0xa1, 0xbb, 0x1c, 0xdd, 0x75, 0xc6, 0xf8, 0x4f, 0xe1, 0xae,
-	0xb7, 0xbc, 0xcc, 0x91, 0x37, 0x38, 0xa5, 0xe7, 0x0b, 0x29, 0xad, 0x28, 0x91, 0xc8, 0x2a, 0xe2,
-	0xf8, 0x8f, 0xe1, 0x6a, 0x72, 0x67, 0x12, 0x5f, 0x6d, 0xc8, 0xd7, 0xf9, 0x8a, 0x8f, 0x3f, 0xee,
-	0x3e, 0x64, 0x9e, 0x0e, 0xf6, 0x33, 0x37, 0xfb, 0xb3, 0xf7, 0x26, 0xf2, 0x26, 0x5f, 0xe4, 0x27,
-	0xbf, 0xd5, 0xa5, 0x0b, 0xb9, 0x98, 0x2e, 0x3b, 0x44, 0x76, 0xda, 0x5e, 0x97, 0x3f, 0x59, 0x72,
-	0x88, 0xb2, 0x6d, 0xf8, 0xec, 0x24, 0xfc, 0x0d, 0x5c, 0xb3, 0xe7, 0x5b, 0xf4, 0xb2, 0xcc, 0x69,
-	0x6d, 0x5d, 0xb6, 0xa5, 0x4f, 0x16, 0x11, 0xc1, 0x6f, 0xd2, 0xab, 0x5c, 0x5e, 0x4b, 0xc8, 0x37,
-	0x96, 0x99, 0x7a, 0xae, 0xea, 0xc8, 0x4f, 0xc4, 0xbf, 0x81, 0xeb, 0xc6, 0xa2, 0xaa, 0x44, 0xbe,
-	0xc9, 0x29, 0x3e, 0xba, 0x04, 0xc5, 0x98, 0xd3, 0xc5, 0x84, 0xf0, 0x08, 0xae, 0x7a, 0xb3, 0x2d,
-	0x07, 0xf9, 0x16, 0xa7, 0xfe, 0xf0, 0x02, 0x7b, 0x9c, 0xc1, 0x26, 0xf3, 0x04, 0xa2, 0x60, 0x41,
-	0x4f, 0xe5, 0xdb, 0x4b, 0x83, 0x05, 0x3d, 0x25, 0x1c, 0x0d, 0xff, 0x1c, 0xd0, 0x78, 0x26, 0x5d,
-	0x95, 0x3f, 0xe5, 0x53, 0x1f, 0x5c, 0x94, 0xdd, 0xe5, 0x73, 0xdb, 0xb9, 0xe9, 0xd8, 0x02, 0x79,
-	0x7c, 0x41, 0x06, 0x2c, 0xdf, 0x59, 0x62, 0xfc, 0x17, 0xa5, 0xcd, 0xe4, 0x42, 0x72, 0x58, 0x83,
-	0xcd, 0xa8, 0x2f, 0x96, 0xf8, 0x36, 0xcd, 0xe0, 0x5d, 0x35, 0xf9, 0x2e, 0x5f, 0xe8, 0xf3, 0x0b,
-	0x22, 0xc8, 0x7c, 0x1b, 0x8e, 0x6c, 0xe8, 0x8b, 0x9a, 0x73, 0xbf, 0x86, 0x8d, 0xf1, 0x82, 0x24,
-	0x53, 0x6e, 0x2f, 0x21, 0xbf, 0x30, 0x2b, 0x5d, 0x48, 0x06, 0x87, 0x70, 0x7b, 0xbc, 0x24, 0x87,
-	0x95, 0xef, 0xf1, 0x65, 0x9e, 0x5e, 0x7e, 0x99, 0x58, 0x64, 0x4b, 0xc9, 0xb2, 0x4c, 0x66, 0x1c,
-	0xe7, 0x9a, 0xb2, 0xb2, 0x24, 0xb6, 0xa7, 0x19, 0x69, 0x3a, 0x81, 0xd9, 0xed, 0x78, 0x36, 0x53,
-	0x95, 0xef, 0x2f, 0xb1, 0xdb, 0xb9, 0xbc, 0x96, 0xcc, 0x13, 0x50, 0xfe, 0xae, 0x2c, 0xbe, 0x19,
-	0xad, 0xc1, 0x7a, 0x77, 0xd0, 0xef, 0xab, 0xdd, 0x11, 0x2a, 0xe0, 0x06, 0x54, 0xc5, 0x8b, 0xba,
-	0x87, 0x8a, 0xec, 0x75, 0x78, 0xb4, 0x3b, 0xec, 0x92, 0xde, 0xae, 0x8a, 0x4a, 0xfc, 0xf3, 0x51,
-	0x32, 0xd8, 0x3b, 0xea, 0xaa, 0x04, 0x95, 0x71, 0x05, 0x4a, 0x43, 0xb5, 0xbf, 0x87, 0xd6, 0x30,
-	0x82, 0x3a, 0x7b, 0xd2, 0x88, 0xda, 0x55, 0x7b, 0x87, 0x23, 0xb4, 0xce, 0x0a, 0x0c, 0x0e, 0x51,
-	0x09, 0x19, 0x10, 0x54, 0x61, 0x8b, 0x1c, 0xa8, 0xc3, 0x61, 0xe7, 0xb5, 0x8a, 0xaa, 0xbc, 0xb2,
-	0xe8, 0xbe, 0x45, 0xc0, 0x28, 0xbc, 0xda, 0x1f, 0xfc, 0x02, 0xd5, 0x70, 0x0b, 0x6a, 0x47, 0xfd,
-	0x74, 0xa9, 0x3a, 0xbf, 0x1a, 0x3e, 0xea, 0x76, 0xd5, 0xe1, 0x10, 0x35, 0x70, 0x15, 0xca, 0x11,
-	0xa1, 0x26, 0xab, 0x54, 0xba, 0xfb, 0x83, 0xa1, 0xaa, 0x25, 0x8c, 0xb4, 0x52, 0x58, 0x77, 0xd0,
-	0x1f, 0x1e, 0x1d, 0xa8, 0x04, 0x21, 0xbc, 0x01, 0x28, 0xc6, 0xd0, 0x62, 0x42, 0x57, 0xd9, 0x82,
-	0x87, 0xbd, 0xfe, 0x6b, 0x84, 0xf9, 0xd3, 0xa0, 0xff, 0x1a, 0x5d, 0xc3, 0x0f, 0xe0, 0x1e, 0x51,
-	0xf7, 0xd4, 0xfd, 0xde, 0x3b, 0x95, 0x68, 0x47, 0xfd, 0x4e, 0xf7, 0x6d, 0x7f, 0xf0, 0x8b, 0x7d,
-	0x75, 0xef, 0xb5, 0xba, 0xa7, 0x09, 0x9e, 0x87, 0x68, 0x03, 0xcb, 0xb0, 0x71, 0xd8, 0x21, 0xa3,
-	0xde, 0xa8, 0x37, 0xe8, 0xf3, 0x91, 0x51, 0x67, 0xaf, 0x33, 0xea, 0xa0, 0xeb, 0xf8, 0x1e, 0x7c,
-	0xba, 0x68, 0x44, 0x23, 0xea, 0xf0, 0x70, 0xd0, 0x1f, 0xaa, 0x68, 0x93, 0x7f, 0x0c, 0x31, 0x18,
-	0xbc, 0x3d, 0x3a, 0x44, 0x9f, 0xe0, 0x6b, 0xd0, 0x8a, 0x9e, 0x53, 0x04, 0x99, 0x6f, 0x41, 0x30,
-	0xaf, 0x0d, 0x47, 0x9d, 0xd1, 0x10, 0xdd, 0xc0, 0xb7, 0xe0, 0x93, 0x3c, 0x2c, 0x9d, 0x70, 0x93,
-	0xb1, 0x43, 0xd4, 0x4e, 0xf7, 0x8d, 0xba, 0xa7, 0x31, 0x39, 0x0f, 0x5e, 0x69, 0xa3, 0xc1, 0x61,
-	0xaf, 0x8b, 0x6e, 0x45, 0x6a, 0x51, 0xdf, 0xa2, 0xdb, 0xf8, 0x13, 0xb8, 0xf6, 0x5a, 0x1d, 0x69,
-	0xfb, 0x9d, 0xe1, 0x28, 0xde, 0x89, 0xd6, 0xdb, 0x43, 0x9f, 0xe2, 0x36, 0xdc, 0x5e, 0x30, 0x90,
-	0x92, 0xbf, 0x83, 0x6f, 0xc2, 0x66, 0xa7, 0x3b, 0xea, 0xbd, 0x4b, 0x65, 0xaa, 0x75, 0xdf, 0x74,
-	0xfa, 0xaf, 0x55, 0x74, 0x97, 0xf1, 0xc5, 0x66, 0xf3, 0xf5, 0x86, 0x6c, 0xe5, 0x7e, 0xe7, 0x40,
-	0x1d, 0x1e, 0x76, 0xba, 0x2a, 0x6a, 0xe3, 0x1f, 0x41, 0xfb, 0x82, 0xc1, 0x94, 0xfc, 0x3d, 0x66,
-	0x1e, 0x0c, 0x6b, 0xd8, 0x7d, 0xa3, 0x1e, 0x74, 0x90, 0x12, 0x73, 0x1a, 0xbd, 0xa7, 0x88, 0xf7,
-	0x1f, 0x7d, 0xc9, 0xef, 0x48, 0xb3, 0x1f, 0x7a, 0xf2, 0x6f, 0x9c, 0x07, 0x7d, 0x15, 0x5d, 0x61,
-	0x76, 0xb4, 0xff, 0xcd, 0xf3, 0xe8, 0x03, 0xe7, 0x6f, 0xf6, 0x7b, 0xbb, 0xa8, 0xc0, 0x9f, 0x86,
-	0xa3, 0x3d, 0x54, 0x7c, 0xf4, 0xaf, 0x45, 0xa8, 0x65, 0x8a, 0x31, 0x66, 0xa3, 0x47, 0x0e, 0xcb,
-	0x19, 0xc4, 0x3d, 0xc1, 0x15, 0x7c, 0x15, 0x1a, 0x71, 0xbc, 0xcd, 0x5c, 0x40, 0x1c, 0xb2, 0xba,
-	0xc9, 0x0f, 0xa8, 0x63, 0x88, 0x5b, 0x86, 0x02, 0xe3, 0xae, 0x13, 0x06, 0x27, 0xd4, 0x09, 0x2c,
-	0x23, 0xbd, 0xe5, 0x40, 0x45, 0xbc, 0x09, 0xb8, 0x13, 0xdd, 0x4a, 0x7f, 0x97, 0x81, 0x97, 0xd8,
-	0x5a, 0xb1, 0x5f, 0xdb, 0x0d, 0xfd, 0x73, 0x54, 0x66, 0x4a, 0x17, 0xf7, 0xc5, 0x7d, 0x37, 0x20,
-	0x54, 0x37, 0xcf, 0xd1, 0x1a, 0xb3, 0xbc, 0x38, 0x61, 0xdb, 0x8d, 0x7a, 0x3c, 0x3f, 0x0f, 0xdd,
-	0x40, 0x57, 0x3f, 0x18, 0x94, 0x9a, 0x34, 0xca, 0x4f, 0xd1, 0x3a, 0xfe, 0x1c, 0x1e, 0x2c, 0x45,
-	0xfb, 0x60, 0xd0, 0xe8, 0x62, 0xa5, 0xc2, 0xb6, 0x14, 0x5f, 0xa0, 0x44, 0xb3, 0xab, 0x4c, 0x5b,
-	0x2c, 0xbd, 0x9e, 0x4e, 0x5d, 0x2f, 0xa0, 0xa6, 0xa8, 0x0a, 0xa3, 0x41, 0x60, 0xf8, 0xdc, 0x6b,
-	0xf5, 0xdd, 0xe0, 0x95, 0x1b, 0x3a, 0x26, 0xaa, 0x31, 0xc3, 0x1a, 0x66, 0x3e, 0x17, 0x4b, 0x46,
-	0xea, 0xfc, 0x76, 0x26, 0x6e, 0x8a, 0xc5, 0xd0, 0x06, 0xdb, 0xd9, 0xc8, 0x75, 0x0f, 0x74, 0xe7,
-	0x9c, 0x44, 0x75, 0xb2, 0x8f, 0x9a, 0x8c, 0x08, 0xa7, 0x3b, 0xa2, 0xde, 0xc4, 0x72, 0xf4, 0x20,
-	0xde, 0x4c, 0x8b, 0x89, 0x26, 0xd9, 0x0c, 0x13, 0x0d, 0x3f, 0xa9, 0x3d, 0x87, 0x5f, 0x5e, 0x45,
-	0xac, 0xe8, 0x13, 0x8a, 0xae, 0x32, 0xd1, 0xf6, 0xf8, 0x15, 0x92, 0x1e, 0x58, 0xc7, 0x36, 0x8d,
-	0x9c, 0x17, 0xc2, 0x8f, 0xde, 0x02, 0xa4, 0xdf, 0x47, 0xb0, 0x63, 0x93, 0xbe, 0x89, 0x2f, 0xdf,
-	0xaf, 0x41, 0x2b, 0x85, 0xfd, 0xd2, 0xd0, 0xdf, 0x3d, 0x8d, 0x14, 0x9b, 0x02, 0x3b, 0x4c, 0x97,
-	0x3e, 0x2a, 0x3c, 0xfa, 0x5e, 0x82, 0xd6, 0xe1, 0xcc, 0x47, 0x89, 0x6b, 0x50, 0x38, 0x7b, 0x82,
-	0xae, 0xf0, 0x5f, 0x36, 0x93, 0xfd, 0xee, 0xa0, 0x02, 0xff, 0x7d, 0x86, 0x8a, 0xfc, 0xf7, 0x39,
-	0x2a, 0xf1, 0xdf, 0x9f, 0xa0, 0x32, 0xff, 0x7d, 0x81, 0xd6, 0xf8, 0xef, 0xef, 0xa3, 0x75, 0xfe,
-	0xfb, 0x25, 0xaa, 0xf0, 0xdf, 0xaf, 0x22, 0x67, 0x77, 0xf6, 0xf4, 0x09, 0x82, 0xe8, 0xe1, 0x29,
-	0xaa, 0x45, 0x0f, 0x3b, 0xa8, 0x1e, 0x3d, 0x3c, 0x43, 0x8d, 0xdd, 0x87, 0xa0, 0xb8, 0xde, 0x78,
-	0x5b, 0x9f, 0xb2, 0xe4, 0x22, 0x76, 0xe9, 0x86, 0x3b, 0x99, 0xb8, 0xce, 0xb6, 0x1e, 0xff, 0x33,
-	0xe1, 0x4d, 0xf1, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x3f, 0x99, 0x3c, 0x38, 0xad, 0x30, 0x00,
-	0x00,
+var fileDescriptor_PulsarApi_976de857822f53b5 = []byte{
+	// 4429 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x5b, 0x4f, 0x8f, 0xdb, 0x48,
+	0x76, 0x37, 0xf5, 0xa7, 0x5b, 0x7a, 0xfa, 0x57, 0xae, 0x6e, 0x7b, 0x34, 0xb6, 0xc7, 0x96, 0xe9,
+	0x3f, 0xdb, 0xe3, 0x1d, 0x77, 0xec, 0xb6, 0xd7, 0x99, 0xf1, 0x6e, 0x80, 0x51, 0xab, 0x69, 0x5b,
+	0x71, 0x5b, 0xd2, 0x94, 0xd4, 0x5e, 0xcc, 0x64, 0x17, 0x5c, 0x36, 0x59, 0x56, 0x13, 0x4d, 0x91,
+	0x0a, 0x49, 0xf5, 0xb8, 0xe7, 0x90, 0xdb, 0x22, 0x97, 0x20, 0x41, 0x92, 0x63, 0x80, 0x2c, 0x72,
+	0xc8, 0x07, 0xc8, 0x2d, 0xc0, 0x7e, 0x83, 0xfd, 0x06, 0x39, 0x05, 0x58, 0x20, 0xc7, 0x05, 0x02,
+	0x24, 0x1f, 0x20, 0xa8, 0x62, 0xf1, 0x9f, 0x44, 0x49, 0xed, 0x99, 0x3d, 0xe4, 0x24, 0xf2, 0xd5,
+	0xab, 0x57, 0xaf, 0x5e, 0xfd, 0xea, 0xbd, 0x57, 0xaf, 0x28, 0x68, 0x0c, 0x66, 0x96, 0xa7, 0xb9,
+	0xed, 0xa9, 0xb9, 0x3b, 0x75, 0x1d, 0xdf, 0xc1, 0xd5, 0x29, 0x27, 0x04, 0x6f, 0xf2, 0xef, 0x73,
+	0xb0, 0x31, 0xd4, 0x4f, 0xe8, 0x44, 0xc3, 0x18, 0x0a, 0xb6, 0x36, 0xa1, 0x4d, 0xa9, 0x95, 0xdb,
+	0x29, 0x13, 0xfe, 0x8c, 0x6f, 0x41, 0xc5, 0xe3, 0xad, 0xaa, 0xa1, 0xf9, 0x5a, 0x33, 0xdf, 0xca,
+	0xed, 0x54, 0x09, 0x04, 0xa4, 0x03, 0xcd, 0xd7, 0xf0, 0x43, 0x28, 0xf8, 0xe7, 0x53, 0xda, 0x2c,
+	0xb4, 0x72, 0x3b, 0xf5, 0xbd, 0x8f, 0x77, 0x93, 0xc2, 0x77, 0x03, 0xc1, 0xbb, 0xa3, 0xf3, 0x29,
+	0x25, 0x9c, 0x0d, 0x3f, 0x03, 0x98, 0xba, 0xce, 0x94, 0xba, 0xbe, 0x49, 0xbd, 0x66, 0xb1, 0x95,
+	0xdf, 0xa9, 0xec, 0x5d, 0x4d, 0x77, 0x7a, 0x4d, 0xcf, 0xdf, 0x6a, 0xd6, 0x8c, 0x92, 0x04, 0xa7,
+	0xfc, 0x5b, 0x09, 0x0a, 0x4c, 0x0c, 0x2e, 0x41, 0xa1, 0xe7, 0xd8, 0x14, 0x5d, 0xc2, 0x00, 0x1b,
+	0x43, 0xdf, 0x35, 0xed, 0x31, 0x92, 0x18, 0xf5, 0xcf, 0x3d, 0xc7, 0x46, 0x39, 0x5c, 0x85, 0xd2,
+	0x80, 0x89, 0x39, 0x9e, 0xbd, 0x43, 0x79, 0x46, 0x6f, 0x9f, 0xb9, 0x0e, 0x2a, 0xb0, 0xa7, 0x7d,
+	0xc7, 0xb1, 0x50, 0x91, 0x3d, 0x75, 0x6d, 0xff, 0x73, 0xb4, 0x81, 0xcb, 0x50, 0xec, 0xda, 0xfe,
+	0xe3, 0x67, 0x68, 0x53, 0x3c, 0x3e, 0xd9, 0x43, 0x25, 0xf1, 0xf8, 0xec, 0x29, 0x2a, 0xb3, 0xc7,
+	0x17, 0x96, 0xa3, 0xf9, 0x08, 0xd8, 0x68, 0x07, 0xce, 0xec, 0xd8, 0xa2, 0xa8, 0xc2, 0x24, 0x1c,
+	0x68, 0x3e, 0x45, 0x55, 0xf6, 0x34, 0x32, 0x27, 0x14, 0xd5, 0x70, 0x0d, 0xca, 0xec, 0xc9, 0xf3,
+	0xb5, 0xc9, 0x14, 0xd5, 0x99, 0x1a, 0xe1, 0x3c, 0x50, 0x43, 0xfe, 0x1b, 0x09, 0x6a, 0x6f, 0xa8,
+	0xe7, 0x69, 0x63, 0xda, 0x35, 0xb8, 0xd9, 0xae, 0x41, 0xc9, 0xa2, 0xc6, 0x98, 0xba, 0x5d, 0x83,
+	0xdb, 0xbb, 0x40, 0xa2, 0x77, 0xdc, 0x84, 0x4d, 0x6a, 0xfb, 0xee, 0x79, 0xd7, 0x68, 0xe6, 0x78,
+	0x53, 0xf8, 0x8a, 0x5b, 0x50, 0x9e, 0x6a, 0xae, 0x6f, 0xfa, 0xa6, 0x63, 0x37, 0xf3, 0x2d, 0x69,
+	0xa7, 0xf8, 0x3c, 0xf7, 0xf0, 0x31, 0x89, 0x89, 0xf8, 0x0e, 0x54, 0x8e, 0x35, 0x5f, 0x3f, 0x51,
+	0x4d, 0xdb, 0xa0, 0xef, 0x9b, 0x85, 0x88, 0x07, 0x38, 0xb9, 0xcb, 0xa8, 0xf2, 0x5e, 0xac, 0x1c,
+	0x46, 0x90, 0x3f, 0xa5, 0xe7, 0x62, 0xcd, 0xd9, 0x23, 0xde, 0x86, 0xe2, 0x19, 0x6b, 0xe2, 0x83,
+	0x97, 0x49, 0xf0, 0x22, 0x3f, 0x83, 0xea, 0x6b, 0x7a, 0x7e, 0xe8, 0xd8, 0xe3, 0x0b, 0xf5, 0x2b,
+	0x84, 0xfd, 0x2c, 0xa8, 0x2b, 0xb6, 0xee, 0x9e, 0x4f, 0x99, 0x7a, 0xaf, 0xe9, 0xb9, 0xb7, 0xae,
+	0x67, 0x55, 0xf4, 0xc4, 0x7b, 0x50, 0x9a, 0x50, 0x5f, 0x13, 0xb8, 0x5b, 0x05, 0x94, 0x88, 0x4f,
+	0xfe, 0xfd, 0x06, 0x34, 0x84, 0xa1, 0xdf, 0x08, 0x1a, 0xbe, 0x03, 0xb5, 0xa9, 0xeb, 0x18, 0x33,
+	0x9d, 0xba, 0x6a, 0x02, 0xdf, 0xd5, 0x90, 0xd8, 0x0b, 0x71, 0x4e, 0xff, 0x72, 0x46, 0x6d, 0x9d,
+	0xaa, 0x66, 0x68, 0x77, 0x08, 0x49, 0x5d, 0x03, 0xdf, 0x86, 0xea, 0x74, 0x76, 0x6c, 0x99, 0xde,
+	0x89, 0xea, 0x9b, 0x13, 0xca, 0x77, 0x42, 0x81, 0x54, 0x04, 0x8d, 0x2d, 0xfd, 0x1c, 0xb6, 0x0b,
+	0x17, 0xc5, 0x36, 0xfe, 0x11, 0x34, 0x5c, 0x3a, 0xb5, 0x4c, 0x5d, 0xf3, 0xa9, 0xa1, 0xbe, 0x73,
+	0x9d, 0x49, 0xb3, 0xd8, 0x92, 0x76, 0xca, 0xa4, 0x1e, 0x93, 0x5f, 0xb8, 0xce, 0x84, 0xcf, 0x24,
+	0x5c, 0x69, 0x95, 0xd9, 0x70, 0x83, 0xb3, 0x55, 0x23, 0xe2, 0x6b, 0x7a, 0xce, 0x14, 0x8d, 0xba,
+	0xa9, 0xbe, 0xd3, 0xdc, 0x6c, 0xe5, 0x77, 0xca, 0xa4, 0x12, 0xd1, 0x46, 0x0e, 0x56, 0xa0, 0xa2,
+	0x3b, 0x93, 0xa9, 0x4b, 0x3d, 0x8f, 0x01, 0xa9, 0xd4, 0x92, 0x76, 0xea, 0x7b, 0x9f, 0xa4, 0x35,
+	0xed, 0xc4, 0x0c, 0x6c, 0xdf, 0x3d, 0x2f, 0xf4, 0xfa, 0x3d, 0x85, 0x24, 0xfb, 0xe1, 0x5d, 0xb8,
+	0x3c, 0xb3, 0x43, 0x02, 0x35, 0x54, 0xcf, 0xfc, 0x8e, 0x36, 0xcb, 0x2d, 0x69, 0xa7, 0xf6, 0x5c,
+	0x7a, 0x44, 0x50, 0xb2, 0x6d, 0x68, 0x7e, 0x47, 0xf1, 0x53, 0xb8, 0x62, 0xcf, 0x26, 0xea, 0x24,
+	0x58, 0x1f, 0x4f, 0x35, 0x6d, 0x95, 0x83, 0xb2, 0x59, 0xe1, 0x28, 0x95, 0x1e, 0x13, 0x6c, 0xcf,
+	0x26, 0x62, 0xf9, 0xbc, 0xae, 0xbd, 0xcf, 0x1a, 0x71, 0x0b, 0x80, 0x9e, 0x51, 0xdb, 0x0f, 0xcc,
+	0x5e, 0x6d, 0x49, 0x3b, 0x05, 0x26, 0xbe, 0xcc, 0x89, 0xdc, 0xee, 0x0a, 0x34, 0x68, 0x04, 0x31,
+	0x66, 0x17, 0xaf, 0x59, 0xe3, 0xc6, 0xbf, 0x91, 0x9e, 0x52, 0x1a, 0x87, 0xa4, 0x4e, 0xd3, 0xb8,
+	0xfc, 0x51, 0x4a, 0x8c, 0x66, 0x8d, 0x9d, 0x66, 0x3d, 0x58, 0x86, 0x98, 0xdc, 0xb6, 0xc6, 0x0e,
+	0xfe, 0x14, 0x50, 0x82, 0x71, 0xaa, 0xb9, 0xda, 0xa4, 0xd9, 0x68, 0x49, 0x3b, 0x55, 0x92, 0x10,
+	0x30, 0x60, 0x64, 0x7c, 0x0f, 0xea, 0xc2, 0x7d, 0x9e, 0x51, 0x97, 0x1b, 0x1b, 0x71, 0xc6, 0x5a,
+	0x40, 0x7d, 0x1b, 0x10, 0xf1, 0x97, 0xf0, 0x71, 0x6a, 0x61, 0xd5, 0xe3, 0x67, 0x4f, 0x55, 0x6a,
+	0xeb, 0x8e, 0x41, 0x8d, 0xe6, 0xe5, 0x96, 0xb4, 0x53, 0x7a, 0x5e, 0x7c, 0xa7, 0x59, 0x1e, 0x25,
+	0x57, 0x93, 0x6b, 0xbd, 0xff, 0xec, 0xa9, 0x12, 0x30, 0xb1, 0x55, 0x77, 0x5c, 0x83, 0x32, 0x77,
+	0xc8, 0x91, 0x81, 0xf9, 0x30, 0x95, 0x90, 0xc6, 0x80, 0x71, 0x1f, 0x1a, 0x06, 0xb5, 0xcc, 0x33,
+	0xea, 0xaa, 0x9a, 0xb0, 0xe6, 0x56, 0x4b, 0xda, 0xc9, 0x93, 0x9a, 0x20, 0xb7, 0x03, 0x73, 0xde,
+	0x82, 0xca, 0x44, 0x73, 0x4f, 0xa9, 0xab, 0x72, 0xc7, 0xbe, 0xcd, 0x16, 0x87, 0x40, 0x40, 0x62,
+	0x50, 0x90, 0x7f, 0x97, 0x83, 0x2b, 0x43, 0xd3, 0x1e, 0x5b, 0x74, 0x7e, 0xab, 0xa5, 0x77, 0x80,
+	0x74, 0xe1, 0x1d, 0xb0, 0x00, 0xec, 0x5c, 0x36, 0xb0, 0xa7, 0xda, 0xb9, 0xe5, 0x68, 0x02, 0x69,
+	0x6c, 0x07, 0x16, 0x49, 0x45, 0xd0, 0x38, 0xc2, 0x1e, 0x40, 0x8d, 0x61, 0x4e, 0xd3, 0xd9, 0x46,
+	0x72, 0x66, 0x3e, 0xf7, 0x7f, 0x91, 0xed, 0xaa, 0x51, 0x5b, 0x7f, 0xe6, 0xcf, 0xe1, 0xaa, 0x98,
+	0x81, 0xab, 0x95, 0xab, 0xb2, 0xf1, 0x7d, 0x56, 0x65, 0x73, 0x61, 0x55, 0xe4, 0x7f, 0xcd, 0x43,
+	0xbd, 0xe3, 0x4c, 0x26, 0x9a, 0x6d, 0x74, 0x1c, 0xdb, 0xa6, 0xba, 0xcf, 0x40, 0xa3, 0x5b, 0x26,
+	0x53, 0x2d, 0x04, 0x4d, 0xe0, 0xb1, 0x6a, 0x01, 0x35, 0x04, 0xcd, 0x17, 0x50, 0xd1, 0x66, 0xfe,
+	0x89, 0x3a, 0xa1, 0xfe, 0x89, 0x63, 0x70, 0x93, 0xd5, 0xf7, 0x9a, 0x69, 0x6b, 0xb7, 0x67, 0xfe,
+	0xc9, 0x1b, 0xde, 0x4e, 0x40, 0x8b, 0x9e, 0xf1, 0x0e, 0xa0, 0x44, 0xd7, 0xc0, 0x2b, 0x0a, 0x97,
+	0x13, 0x73, 0x71, 0xbf, 0x78, 0x1d, 0xca, 0x9c, 0x53, 0x78, 0x61, 0xa6, 0x7e, 0x89, 0x11, 0x78,
+	0x10, 0xfb, 0x0c, 0x10, 0x1f, 0x46, 0x77, 0xac, 0x48, 0xd5, 0x20, 0xe2, 0x48, 0x8f, 0x48, 0x23,
+	0x6c, 0x0a, 0xf5, 0x7d, 0x08, 0x5b, 0x53, 0xd7, 0x79, 0x7f, 0xae, 0xfa, 0x8e, 0x7a, 0xec, 0x3a,
+	0x0c, 0x60, 0x33, 0xd7, 0x12, 0x3e, 0x0c, 0xf1, 0xa6, 0x91, 0xb3, 0xcf, 0x1b, 0x8e, 0x5c, 0x0b,
+	0x3f, 0x04, 0xec, 0xb8, 0xe6, 0xd8, 0xb4, 0x35, 0x4b, 0x9d, 0xba, 0xa6, 0xad, 0x9b, 0x53, 0xcd,
+	0xe2, 0x16, 0x2c, 0x93, 0xcb, 0x61, 0xcb, 0x20, 0x6c, 0xc0, 0x9f, 0x25, 0xd8, 0x63, 0x8d, 0x4b,
+	0x81, 0xf0, 0xb0, 0xa5, 0x1d, 0x6a, 0xfe, 0x08, 0xb6, 0xd3, 0xdc, 0xc2, 0x88, 0x65, 0xce, 0x8f,
+	0x93, 0xfc, 0x81, 0x31, 0xe4, 0xbf, 0x97, 0x00, 0xa5, 0xd7, 0x89, 0x1a, 0x7c, 0x7b, 0x53, 0x97,
+	0xed, 0xa8, 0xb9, 0x95, 0x0a, 0xa8, 0xe1, 0xcc, 0xb3, 0xec, 0x94, 0x5b, 0x6a, 0xa7, 0x1d, 0x40,
+	0x13, 0xed, 0x7d, 0xe8, 0x26, 0x43, 0xac, 0xb3, 0x4d, 0x58, 0x9f, 0x68, 0xef, 0xc5, 0x96, 0x63,
+	0x70, 0x97, 0xff, 0x59, 0x82, 0x2d, 0xa1, 0x13, 0xd3, 0x94, 0x50, 0x6f, 0xea, 0xd8, 0x1e, 0xcd,
+	0x04, 0x90, 0xb4, 0x08, 0xa0, 0x3d, 0x28, 0xb9, 0xa2, 0x0b, 0x57, 0x67, 0x61, 0xaf, 0x86, 0xe6,
+	0x22, 0x11, 0x5f, 0xe6, 0x54, 0xf2, 0xcb, 0xa6, 0x22, 0xff, 0x8b, 0x04, 0xdb, 0x09, 0x05, 0x3b,
+	0x27, 0x9a, 0x65, 0x51, 0x7b, 0x4c, 0x33, 0x0d, 0x27, 0x2d, 0x1a, 0xee, 0x29, 0x94, 0xf5, 0xb0,
+	0xcf, 0x1a, 0x15, 0x63, 0xc6, 0x0f, 0xd4, 0xf1, 0x2b, 0x28, 0x45, 0xb0, 0xc8, 0xda, 0x17, 0xd2,
+	0xfa, 0x7d, 0x91, 0x4b, 0xef, 0x0b, 0xf9, 0x37, 0x1b, 0x11, 0x56, 0x86, 0xb3, 0x63, 0x4f, 0x77,
+	0xcd, 0x63, 0xca, 0x92, 0x1c, 0xdf, 0x99, 0x9a, 0xba, 0x80, 0x48, 0xf0, 0x82, 0x65, 0xa8, 0x7a,
+	0x01, 0x0b, 0x8f, 0x1a, 0x22, 0xe7, 0x4a, 0xd1, 0xf0, 0x97, 0xb0, 0xe9, 0xcd, 0x8e, 0x99, 0xeb,
+	0xe5, 0x3e, 0xaf, 0xbe, 0x77, 0x7f, 0x21, 0x54, 0xa7, 0x86, 0xda, 0x1d, 0x06, 0xdc, 0x24, 0xec,
+	0xc6, 0x5c, 0xba, 0xee, 0xd8, 0xde, 0x6c, 0x42, 0x5d, 0x96, 0xdd, 0x14, 0x82, 0xec, 0x26, 0x24,
+	0x75, 0x0d, 0xfc, 0x09, 0x80, 0xcb, 0x72, 0x1d, 0xcf, 0x67, 0xed, 0x45, 0xde, 0x5e, 0x16, 0x94,
+	0xae, 0xc1, 0xfc, 0x73, 0xd4, 0x9f, 0x1b, 0x45, 0x24, 0x1e, 0x21, 0x91, 0x9b, 0xe4, 0x1e, 0xd4,
+	0xa7, 0xae, 0xe9, 0xb8, 0xa6, 0x7f, 0xae, 0x5a, 0xf4, 0x8c, 0x06, 0x9b, 0xb5, 0x48, 0x6a, 0x21,
+	0xf5, 0x90, 0x11, 0xf1, 0x4d, 0xd8, 0x34, 0x66, 0xae, 0x76, 0x6c, 0x51, 0xbe, 0x3b, 0x4b, 0xcf,
+	0x0b, 0xbe, 0x3b, 0xa3, 0x24, 0x24, 0x62, 0x05, 0x90, 0xe7, 0x6b, 0xae, 0x1f, 0x6d, 0x00, 0x33,
+	0xd8, 0x96, 0x95, 0xbd, 0xeb, 0xe9, 0x69, 0xa7, 0x12, 0x6a, 0x52, 0xe7, 0x9d, 0x22, 0x5a, 0x2a,
+	0x7b, 0x84, 0x8b, 0x65, 0x8f, 0x6c, 0x06, 0x2e, 0xd5, 0x0c, 0x35, 0x8a, 0x13, 0x3c, 0x33, 0x29,
+	0x91, 0x1a, 0xa3, 0x76, 0x42, 0x22, 0xfe, 0x0c, 0x36, 0x82, 0xf0, 0xcd, 0xb3, 0x91, 0xca, 0xde,
+	0x76, 0xd6, 0xa1, 0x87, 0x08, 0x1e, 0xfc, 0x2b, 0x68, 0x98, 0xb6, 0xe9, 0x9b, 0x9a, 0x35, 0x70,
+	0xbc, 0x20, 0x73, 0xaf, 0x71, 0x57, 0xbd, 0xbb, 0x66, 0x15, 0xbb, 0xe9, 0x5e, 0xcf, 0x37, 0x0e,
+	0x35, 0x9f, 0x7a, 0x3e, 0x99, 0x17, 0x87, 0xbf, 0x84, 0x1b, 0x71, 0xc6, 0x97, 0x44, 0x8e, 0xea,
+	0xf9, 0x9a, 0x4f, 0x79, 0x16, 0x53, 0x22, 0xd7, 0x22, 0x9e, 0x61, 0x82, 0x65, 0xc8, 0x38, 0xe4,
+	0x7d, 0xd8, 0x14, 0x98, 0x61, 0xe7, 0x18, 0xe5, 0xbd, 0x6e, 0xcd, 0x3c, 0xf3, 0x2c, 0x3c, 0x64,
+	0x9d, 0x68, 0x2e, 0x35, 0x90, 0xc4, 0xce, 0x34, 0x2f, 0x34, 0xd3, 0x72, 0xce, 0xa8, 0x8b, 0x72,
+	0xb8, 0x0e, 0xf0, 0x9a, 0x9e, 0xab, 0xa2, 0x35, 0x2f, 0xff, 0x18, 0x1a, 0x73, 0x1a, 0xb3, 0xce,
+	0x81, 0xce, 0xe8, 0x12, 0xeb, 0xac, 0x68, 0xae, 0x65, 0xb2, 0x37, 0x49, 0xfe, 0x2f, 0x09, 0x6e,
+	0x89, 0x09, 0x0f, 0xc2, 0xd0, 0x49, 0x8d, 0x11, 0xdb, 0x12, 0x51, 0x32, 0x91, 0xbd, 0x61, 0xd2,
+	0x48, 0xcd, 0xcd, 0x23, 0x35, 0x3b, 0x6a, 0xe4, 0x3f, 0x2c, 0x6a, 0x14, 0x3e, 0x30, 0x6a, 0x14,
+	0x97, 0x46, 0x8d, 0x7f, 0xcf, 0xc1, 0x8f, 0xd6, 0xcc, 0x33, 0xf2, 0xda, 0x37, 0x01, 0xa2, 0x34,
+	0xc2, 0xe3, 0x6e, 0xa7, 0x46, 0x12, 0x94, 0x75, 0x33, 0xff, 0x45, 0xc2, 0x9b, 0xe7, 0x39, 0xc0,
+	0xbe, 0xcc, 0x04, 0xd8, 0x3a, 0x3d, 0x76, 0x0f, 0x1d, 0xe7, 0x74, 0x36, 0xe5, 0x0e, 0x24, 0xf6,
+	0xfb, 0x7f, 0x02, 0x45, 0xea, 0xba, 0x8e, 0xcb, 0x6d, 0xb3, 0x78, 0xce, 0xe7, 0x5e, 0x5b, 0x61,
+	0x0c, 0x24, 0xe0, 0x63, 0x87, 0x58, 0xb1, 0x81, 0x85, 0x79, 0xc2, 0x57, 0xf9, 0x1e, 0x40, 0x3c,
+	0x04, 0xae, 0x30, 0xe8, 0xe9, 0x3a, 0xf5, 0xbc, 0x00, 0x6d, 0x0c, 0x61, 0x0c, 0x6d, 0xf2, 0xaf,
+	0x73, 0x80, 0x85, 0xca, 0x82, 0x9d, 0xaf, 0xff, 0xf7, 0x42, 0xc5, 0x8f, 0xa1, 0xc6, 0xd6, 0x8b,
+	0x79, 0x21, 0xcd, 0x37, 0xcf, 0x02, 0x03, 0x45, 0xd9, 0x5b, 0xba, 0x6d, 0x09, 0x84, 0x0a, 0x1f,
+	0x06, 0xa1, 0xe2, 0x07, 0x42, 0x68, 0x63, 0x29, 0x84, 0xfe, 0x23, 0x0f, 0xd7, 0x16, 0xed, 0x10,
+	0xa1, 0xe6, 0x01, 0xa0, 0x20, 0x99, 0x62, 0x6b, 0x60, 0xea, 0xf4, 0xc8, 0xb5, 0x44, 0xc8, 0x5a,
+	0xa0, 0xe3, 0x47, 0xb0, 0x35, 0x4f, 0x1b, 0x59, 0x9e, 0x48, 0xb6, 0xb3, 0x9a, 0x70, 0x7f, 0x01,
+	0x54, 0x4f, 0x32, 0x41, 0x95, 0xa1, 0x59, 0x36, 0x8e, 0xd2, 0x0b, 0x55, 0x58, 0xbb, 0x50, 0xc5,
+	0x15, 0x0b, 0x15, 0x61, 0x72, 0xe3, 0xc3, 0x31, 0xb9, 0x99, 0xc2, 0x24, 0x4f, 0xf5, 0x83, 0xdc,
+	0xf4, 0xc4, 0x75, 0x66, 0xe3, 0x13, 0xd5, 0x0b, 0xcc, 0xc0, 0x33, 0xd4, 0x52, 0x3a, 0xd5, 0xe7,
+	0x89, 0x6a, 0xc0, 0x16, 0x1b, 0x4b, 0x7e, 0x92, 0x42, 0x75, 0x15, 0x4a, 0x84, 0x1a, 0xa6, 0x4b,
+	0x75, 0xe6, 0xfb, 0x2a, 0xb0, 0x29, 0x72, 0x46, 0x24, 0x25, 0x30, 0x9e, 0x93, 0xff, 0x31, 0x07,
+	0x8d, 0x70, 0x5b, 0x8a, 0x6a, 0xc4, 0x12, 0x80, 0xdf, 0x82, 0x4a, 0x54, 0xc4, 0x88, 0xeb, 0x13,
+	0x21, 0x69, 0x21, 0x82, 0xe7, 0x33, 0x22, 0x78, 0xba, 0x08, 0x52, 0x10, 0x27, 0xac, 0x64, 0x11,
+	0xe4, 0x0e, 0x94, 0xc5, 0x01, 0x96, 0x1a, 0x69, 0xcb, 0xc7, 0xf4, 0x54, 0x60, 0xdd, 0xb8, 0x60,
+	0x60, 0x8d, 0x23, 0xe6, 0xe6, 0xfa, 0x88, 0x29, 0xcf, 0xa0, 0x12, 0x06, 0x43, 0x6a, 0x1b, 0xf3,
+	0x53, 0x97, 0x16, 0xa6, 0xbe, 0xb6, 0x76, 0x73, 0x17, 0xaa, 0xc9, 0xc2, 0x43, 0x98, 0x0c, 0x3e,
+	0x26, 0x95, 0x44, 0xbd, 0x41, 0xfe, 0x07, 0x29, 0x72, 0x38, 0x6c, 0x5c, 0x42, 0x75, 0x6a, 0x4e,
+	0xfd, 0x3f, 0xc2, 0xf0, 0xcf, 0x01, 0x12, 0xb9, 0x4c, 0x7e, 0x7d, 0x2e, 0x53, 0x9e, 0x84, 0xaf,
+	0xf2, 0x6f, 0xe2, 0x63, 0x07, 0x53, 0x8a, 0xc3, 0xf9, 0x8f, 0xa0, 0x52, 0xb4, 0x75, 0xf2, 0x99,
+	0x65, 0xdb, 0x95, 0x5b, 0xa7, 0xc0, 0x71, 0x19, 0xb9, 0xf3, 0x7f, 0x92, 0xa2, 0x03, 0xac, 0x98,
+	0xc5, 0x7c, 0xba, 0x29, 0x2d, 0xa4, 0x9b, 0x69, 0x8b, 0x30, 0xf5, 0x2e, 0x6c, 0x11, 0x96, 0xdd,
+	0xbb, 0x54, 0x54, 0x2c, 0xce, 0x55, 0xdd, 0x99, 0xd9, 0x3e, 0xb7, 0x29, 0x2f, 0x3a, 0x35, 0xe2,
+	0xa6, 0x0e, 0x6b, 0x91, 0xff, 0x37, 0x0f, 0x10, 0x9e, 0x40, 0xf4, 0xd3, 0xf5, 0x9a, 0xfd, 0x14,
+	0x4a, 0x9a, 0x7e, 0x1a, 0x54, 0x3e, 0x72, 0xdc, 0x36, 0xad, 0x4c, 0x87, 0xd7, 0xd6, 0x4f, 0x77,
+	0xdb, 0xfa, 0x69, 0x90, 0x66, 0x6b, 0xc1, 0xc3, 0xc2, 0x42, 0xe7, 0x3f, 0x60, 0x5a, 0x43, 0x40,
+	0x67, 0x9a, 0x65, 0x1a, 0x1a, 0x4f, 0xdc, 0x92, 0xb1, 0x76, 0x67, 0xa9, 0x02, 0x6f, 0xa3, 0x0e,
+	0xc1, 0x5a, 0x35, 0xce, 0xd2, 0x04, 0xa6, 0xd0, 0x42, 0xb5, 0xfd, 0xda, 0xc2, 0x6e, 0x8d, 0x8a,
+	0xba, 0xa9, 0x8a, 0xfb, 0xa7, 0xb0, 0x29, 0x26, 0xc8, 0x52, 0xbd, 0xae, 0x6d, 0x98, 0x67, 0xa6,
+	0x31, 0xd3, 0x2c, 0x74, 0x89, 0xbd, 0x77, 0x66, 0x93, 0x99, 0xc5, 0xdd, 0x30, 0x92, 0xe4, 0xbf,
+	0x93, 0xa0, 0x31, 0xa7, 0x0b, 0xbe, 0x09, 0xd7, 0x8e, 0xe6, 0x0a, 0x80, 0x1d, 0xc7, 0x75, 0x67,
+	0x3c, 0xeb, 0x44, 0x97, 0xf0, 0x55, 0xc0, 0x07, 0x34, 0x51, 0x4d, 0xe4, 0xbd, 0x90, 0x84, 0xb7,
+	0x01, 0x75, 0x4e, 0xa8, 0x7e, 0xea, 0xcd, 0x26, 0x6f, 0x4c, 0x6f, 0xa2, 0xf9, 0xfa, 0x09, 0xca,
+	0xe1, 0x8f, 0xe1, 0x0a, 0xaf, 0x06, 0x1e, 0xd0, 0x21, 0x75, 0x4d, 0xcd, 0x32, 0xbf, 0xa3, 0x41,
+	0x87, 0x3c, 0xde, 0x82, 0xc6, 0x01, 0x0d, 0xab, 0x6e, 0x01, 0xb1, 0x20, 0x1f, 0xc3, 0xf5, 0xc8,
+	0x4e, 0x4c, 0xc9, 0x8e, 0x58, 0xe1, 0xce, 0x89, 0x66, 0x5f, 0x04, 0xa0, 0x32, 0x94, 0x4d, 0x4f,
+	0xd5, 0x78, 0x5f, 0x1e, 0x1f, 0x23, 0x4f, 0x58, 0x32, 0xbd, 0x40, 0xa4, 0xfc, 0x36, 0x72, 0x53,
+	0x2f, 0x2c, 0xe7, 0xdb, 0xf5, 0x32, 0xef, 0x43, 0x5d, 0x2c, 0xf7, 0x80, 0xba, 0x13, 0xd3, 0xf7,
+	0x38, 0xc0, 0x6a, 0x64, 0x8e, 0x2a, 0x8f, 0x22, 0x37, 0x74, 0x64, 0x7b, 0xd1, 0xf1, 0x71, 0xad,
+	0xf8, 0xd5, 0x29, 0x90, 0xfc, 0x5b, 0x29, 0xe1, 0x55, 0xe9, 0xe9, 0x0f, 0x95, 0xf7, 0x43, 0x9c,
+	0x1a, 0x4b, 0x82, 0xc2, 0xbe, 0xa9, 0x9a, 0x3a, 0xc3, 0x7b, 0x81, 0xe0, 0xd0, 0x1e, 0x71, 0x69,
+	0x5d, 0xfe, 0x29, 0x34, 0x85, 0xf2, 0x84, 0x6a, 0xfa, 0x09, 0x35, 0x14, 0xdb, 0xe8, 0xbf, 0x1b,
+	0x85, 0xa1, 0x71, 0xe5, 0x4c, 0xe4, 0xb7, 0x51, 0x11, 0xa2, 0x63, 0x39, 0x1e, 0x8d, 0x22, 0xed,
+	0x5a, 0x37, 0xba, 0xc6, 0xa4, 0x73, 0x72, 0x43, 0x8c, 0xfd, 0xe0, 0xa5, 0xfa, 0x6b, 0x09, 0xee,
+	0x47, 0xb3, 0x15, 0xee, 0xec, 0xc8, 0xd6, 0xf4, 0x53, 0xdb, 0xf9, 0x96, 0x5f, 0x12, 0x85, 0x8e,
+	0xd6, 0x5b, 0x3f, 0xd4, 0xcf, 0xa0, 0x12, 0x2f, 0x13, 0x43, 0xdc, 0x5a, 0x9f, 0x04, 0xd1, 0x3a,
+	0x79, 0xf2, 0x2f, 0x23, 0xd7, 0x2e, 0x72, 0xf4, 0x39, 0xd5, 0xa5, 0x79, 0x54, 0xc4, 0x81, 0x3e,
+	0x77, 0x81, 0x40, 0xff, 0x6f, 0x12, 0x5c, 0x9d, 0x4b, 0x7f, 0x2e, 0x38, 0xce, 0x42, 0x3a, 0x93,
+	0xcb, 0xb8, 0xd3, 0xf9, 0x0c, 0x90, 0xa5, 0x79, 0xbe, 0x9a, 0x0c, 0x85, 0x0c, 0xa8, 0x79, 0x7e,
+	0x21, 0x56, 0x67, 0x6d, 0xc3, 0x38, 0x24, 0x2e, 0x96, 0xea, 0x0b, 0x19, 0xa5, 0x7a, 0xf9, 0x3d,
+	0x54, 0x85, 0xca, 0x81, 0x9f, 0x5b, 0xa3, 0x68, 0x14, 0x68, 0x73, 0x1f, 0x1e, 0x68, 0xf3, 0xe9,
+	0x40, 0x5b, 0x8b, 0x36, 0xf0, 0xc0, 0xb4, 0xc7, 0xc9, 0x57, 0xc7, 0x1e, 0x27, 0xc1, 0x28, 0x56,
+	0x9f, 0x1d, 0xed, 0xd7, 0x1a, 0x72, 0x5d, 0x65, 0x48, 0xfe, 0xef, 0x02, 0xdc, 0xc8, 0x12, 0x4c,
+	0xb2, 0x33, 0xfa, 0x85, 0x01, 0x3e, 0x07, 0xe0, 0x13, 0x53, 0x75, 0xc7, 0xa0, 0xa2, 0x48, 0xbd,
+	0xc2, 0x0a, 0x65, 0xce, 0xdc, 0x71, 0x0c, 0x96, 0x8d, 0xd6, 0x82, 0x9e, 0xb1, 0x3d, 0x78, 0xca,
+	0xca, 0x89, 0x61, 0xaa, 0x71, 0x13, 0x60, 0xe2, 0x8d, 0x89, 0xe6, 0xd3, 0xbe, 0x28, 0xf7, 0x4b,
+	0x24, 0x41, 0x61, 0xc7, 0xa3, 0x89, 0x37, 0x16, 0xe9, 0xfa, 0x74, 0xe6, 0x33, 0xae, 0x22, 0xe7,
+	0x5a, 0xa0, 0x0b, 0x5e, 0xd6, 0x33, 0xda, 0x76, 0xfc, 0x68, 0x11, 0xf0, 0xa6, 0xe8, 0x58, 0x86,
+	0x54, 0xf1, 0x4b, 0x9c, 0x27, 0xd2, 0x05, 0xb1, 0x07, 0x80, 0xb4, 0x33, 0xcd, 0xb4, 0xb4, 0x63,
+	0x2b, 0x72, 0xf9, 0x25, 0xee, 0xe2, 0x16, 0xe8, 0x78, 0x07, 0x1a, 0x33, 0xb6, 0xc5, 0xe3, 0xbd,
+	0xcd, 0x8b, 0x5e, 0x05, 0x32, 0x4f, 0xc6, 0xfb, 0x70, 0xe3, 0xd8, 0x72, 0x18, 0x29, 0x5c, 0x8f,
+	0xbe, 0x7d, 0x24, 0x78, 0xbc, 0xb1, 0xd7, 0x04, 0x5e, 0xed, 0x59, 0xc9, 0xc3, 0x40, 0xa6, 0x19,
+	0x06, 0x8b, 0xbc, 0xbc, 0xc2, 0x55, 0x26, 0xe1, 0x2b, 0x0b, 0x52, 0x7a, 0x58, 0xde, 0x1e, 0x9a,
+	0xb6, 0x1e, 0xdc, 0xb8, 0x95, 0xc9, 0x1c, 0x15, 0x63, 0x71, 0xed, 0x5f, 0xe3, 0xad, 0xc1, 0xdd,
+	0x3e, 0x0b, 0x70, 0x81, 0x9d, 0x94, 0xf7, 0x53, 0xd3, 0xa5, 0x06, 0xaf, 0x3c, 0x49, 0x64, 0x8e,
+	0x2a, 0xd6, 0x6c, 0x5f, 0xd3, 0x4f, 0x2d, 0x67, 0xcc, 0x6f, 0xce, 0x0a, 0x24, 0x41, 0x91, 0xbf,
+	0x86, 0x8f, 0x04, 0xe2, 0x5e, 0x52, 0xff, 0x50, 0xf3, 0x12, 0x55, 0xbd, 0x1f, 0xea, 0x5a, 0x7f,
+	0x1d, 0xd7, 0x9d, 0xe6, 0x65, 0x47, 0x80, 0xee, 0x40, 0x83, 0xbb, 0x8d, 0x44, 0x78, 0x93, 0xd6,
+	0x67, 0xa8, 0x35, 0x2b, 0xa5, 0xe8, 0x1a, 0x3d, 0xfe, 0x53, 0x8a, 0x12, 0x94, 0x97, 0xd4, 0xe7,
+	0x71, 0xcc, 0xeb, 0xbf, 0x63, 0xa8, 0xf1, 0xa6, 0x9a, 0xbe, 0x76, 0x53, 0xdd, 0x80, 0xb2, 0x1d,
+	0xf2, 0x0a, 0xd7, 0x17, 0x13, 0x70, 0x0f, 0x0a, 0x13, 0xb6, 0xd9, 0xf2, 0x2b, 0xca, 0x8c, 0x59,
+	0xa3, 0xee, 0xbe, 0x71, 0x0c, 0xfa, 0x1c, 0x06, 0x0a, 0x19, 0x76, 0x87, 0x23, 0xa5, 0x37, 0x22,
+	0x5c, 0x8e, 0xfc, 0x04, 0x0a, 0xac, 0x85, 0xa5, 0x7d, 0x71, 0x1b, 0xba, 0x84, 0x31, 0xd4, 0x7b,
+	0xfd, 0x9e, 0x9a, 0xa0, 0x49, 0x78, 0x13, 0xf2, 0xed, 0xc3, 0x43, 0x94, 0x93, 0x7f, 0x01, 0x77,
+	0x56, 0x0c, 0x75, 0x51, 0xef, 0x71, 0x15, 0x36, 0xf8, 0xf9, 0x37, 0x88, 0x5c, 0x65, 0x22, 0xde,
+	0x64, 0x3b, 0x3a, 0x15, 0xbd, 0xa4, 0xbe, 0xf8, 0x7c, 0x65, 0x8d, 0xa8, 0xe8, 0x5c, 0x9d, 0x4b,
+	0x9e, 0xab, 0x17, 0xbd, 0x7e, 0x3e, 0xcb, 0xeb, 0xff, 0x41, 0x8a, 0x12, 0x90, 0x68, 0xc0, 0xff,
+	0x27, 0x1e, 0x30, 0x0e, 0xb9, 0x85, 0x0b, 0x54, 0xa3, 0x17, 0xe7, 0x5b, 0xcc, 0x9a, 0xef, 0xff,
+	0x5c, 0x83, 0xca, 0xbe, 0xc6, 0x72, 0x1a, 0x3e, 0x67, 0xbc, 0x27, 0xb6, 0xbb, 0xc4, 0xa3, 0xd8,
+	0xcd, 0xf4, 0x10, 0x09, 0xc6, 0xf4, 0xa7, 0x3e, 0x9b, 0xc2, 0x69, 0x88, 0x64, 0xe0, 0x46, 0x26,
+	0x12, 0x45, 0x65, 0x84, 0x84, 0xcc, 0xf8, 0x67, 0x50, 0x8e, 0x9c, 0x8d, 0x48, 0x2c, 0x6f, 0xae,
+	0xea, 0x49, 0x0d, 0x12, 0x77, 0x60, 0xbd, 0xa3, 0xa4, 0x59, 0x58, 0xe4, 0xe6, 0xea, 0x42, 0x3b,
+	0x89, 0x3b, 0xe0, 0x2f, 0xa0, 0x14, 0xa6, 0x10, 0xdc, 0x30, 0x95, 0x8c, 0xcf, 0x22, 0x92, 0xe9,
+	0x0a, 0x89, 0xd8, 0xf1, 0x43, 0x28, 0x78, 0xd4, 0x0e, 0x2a, 0x79, 0x95, 0xf9, 0x05, 0x4e, 0xd6,
+	0x15, 0x38, 0x1b, 0xee, 0x40, 0x95, 0xfd, 0xaa, 0x6e, 0x50, 0x66, 0x10, 0x85, 0x91, 0xd6, 0xf2,
+	0x6e, 0x01, 0x1f, 0xa9, 0x78, 0x89, 0xda, 0xc4, 0x9f, 0x01, 0x70, 0x21, 0x41, 0x8a, 0x51, 0x5a,
+	0x35, 0xdb, 0xb0, 0x78, 0x40, 0xca, 0x5e, 0x54, 0x47, 0x78, 0x16, 0xe7, 0x1a, 0xe5, 0x15, 0x2b,
+	0x24, 0x90, 0x16, 0x57, 0xcb, 0x1e, 0x40, 0x5e, 0xd3, 0x4f, 0x79, 0xa4, 0xa9, 0xcc, 0xdf, 0x38,
+	0xc7, 0xc7, 0x53, 0xc2, 0x98, 0x98, 0x59, 0xde, 0x59, 0xce, 0xb7, 0x3c, 0xce, 0x2c, 0x33, 0x0b,
+	0x3b, 0x3f, 0x11, 0xce, 0x86, 0xf7, 0xa1, 0x32, 0x8b, 0x4f, 0x3d, 0xe2, 0x82, 0x25, 0xdb, 0x2a,
+	0x89, 0xd3, 0x11, 0x49, 0x76, 0x62, 0xd3, 0xf2, 0x82, 0x34, 0x92, 0x87, 0xa7, 0x65, 0xd3, 0x12,
+	0xa9, 0x26, 0x09, 0x99, 0xf1, 0xa3, 0x30, 0x57, 0xab, 0xf3, 0x5e, 0xd7, 0x32, 0x7b, 0xa5, 0x92,
+	0xb5, 0x2e, 0xd4, 0x75, 0x96, 0xfa, 0xab, 0x11, 0x68, 0x1a, 0xbc, 0xab, 0x9c, 0x8d, 0xd7, 0xe4,
+	0xe9, 0x83, 0xd4, 0xf4, 0xd4, 0x61, 0x24, 0x12, 0x15, 0x06, 0x33, 0xfe, 0xa5, 0xc8, 0x4a, 0x51,
+	0x61, 0x6c, 0x17, 0xa2, 0xa2, 0xf3, 0x47, 0x9f, 0xdf, 0x7f, 0x06, 0xc9, 0x71, 0x68, 0x88, 0xcb,
+	0x5c, 0xd8, 0xdd, 0x95, 0x60, 0x0e, 0x0d, 0xd2, 0x98, 0xce, 0x25, 0xe3, 0x0f, 0xa1, 0x30, 0x35,
+	0xed, 0x31, 0xff, 0xa8, 0x64, 0xd9, 0x1a, 0xb2, 0x9c, 0x94, 0x70, 0x36, 0xce, 0xee, 0xd8, 0x63,
+	0xfe, 0x75, 0xc9, 0x52, 0x76, 0x87, 0xb3, 0x3b, 0xf6, 0x18, 0xff, 0x15, 0xdc, 0x72, 0x57, 0x1f,
+	0x73, 0xf8, 0x37, 0x28, 0x95, 0xbd, 0xa7, 0x99, 0x92, 0xd6, 0x1c, 0x91, 0xc8, 0x3a, 0xe1, 0xf8,
+	0x2f, 0xe0, 0x72, 0x74, 0xcb, 0x12, 0x5e, 0x86, 0x34, 0xaf, 0xf0, 0x11, 0x1f, 0x7e, 0xd8, 0x0d,
+	0xca, 0xa2, 0x1c, 0xec, 0x25, 0xbe, 0x21, 0x99, 0xbf, 0x69, 0x69, 0x5e, 0xe5, 0x83, 0xfc, 0xe4,
+	0x7b, 0x5d, 0xd3, 0x90, 0xe5, 0x72, 0xd9, 0x26, 0xb2, 0xe2, 0x82, 0x7c, 0xf3, 0xa3, 0x15, 0x9b,
+	0x28, 0x59, 0xb8, 0x4f, 0x76, 0xc2, 0xdf, 0xc0, 0x96, 0xb5, 0x58, 0xd4, 0x6f, 0x36, 0xb9, 0xac,
+	0x9d, 0x8b, 0x5e, 0x02, 0x90, 0x2c, 0x21, 0xf8, 0x55, 0x7c, 0x9d, 0xcc, 0xcf, 0x12, 0xcd, 0x8f,
+	0x57, 0x41, 0x3d, 0x75, 0xea, 0x48, 0x77, 0xc4, 0xbf, 0x82, 0x2b, 0x7a, 0xd6, 0xa9, 0xa4, 0x79,
+	0x8d, 0x4b, 0x7c, 0x70, 0x01, 0x89, 0xa1, 0xa6, 0xd9, 0x82, 0xf0, 0x08, 0x2e, 0xbb, 0xf3, 0x25,
+	0x87, 0xe6, 0x75, 0x2e, 0xfd, 0xfe, 0x12, 0x3c, 0xce, 0x71, 0x93, 0x45, 0x01, 0x41, 0xb0, 0xa0,
+	0xa7, 0xcd, 0x1b, 0x2b, 0x83, 0x05, 0x3d, 0x25, 0x9c, 0x0d, 0x7f, 0x05, 0x68, 0x3c, 0x97, 0xae,
+	0x36, 0x3f, 0xe1, 0x5d, 0xef, 0x2d, 0xcb, 0xee, 0xd2, 0xb9, 0xed, 0x42, 0x77, 0x6c, 0x42, 0x73,
+	0xbc, 0x24, 0x03, 0x6e, 0xde, 0x5c, 0x01, 0xfe, 0x65, 0x69, 0x33, 0x59, 0x2a, 0x0e, 0xab, 0x70,
+	0x35, 0xa8, 0xa4, 0x45, 0xbe, 0x4d, 0xd5, 0x79, 0x1d, 0xae, 0x79, 0x8b, 0x0f, 0xf4, 0xe9, 0x92,
+	0x08, 0xb2, 0x58, 0xb8, 0x23, 0xdb, 0x5a, 0x56, 0x39, 0xef, 0x97, 0xb0, 0x3d, 0xce, 0x48, 0x32,
+	0x9b, 0xad, 0x15, 0xe2, 0x33, 0xb3, 0xd2, 0x4c, 0x31, 0x78, 0x06, 0x37, 0xc6, 0x2b, 0x72, 0xd8,
+	0xe6, 0x6d, 0x3e, 0xcc, 0xe3, 0x8b, 0x0f, 0x13, 0x9a, 0x6c, 0xa5, 0x58, 0x96, 0xc9, 0x8c, 0xc3,
+	0x5c, 0xb3, 0x29, 0xaf, 0x88, 0xed, 0x71, 0x46, 0x1a, 0x77, 0x60, 0xb8, 0x1d, 0xcf, 0x67, 0xaa,
+	0xcd, 0x3b, 0x2b, 0x70, 0xbb, 0x90, 0xd7, 0x92, 0x45, 0x01, 0x6c, 0xe7, 0x6a, 0xc9, 0x0f, 0x79,
+	0x9a, 0x77, 0x57, 0xec, 0xdc, 0xd4, 0x27, 0x3f, 0x24, 0xdd, 0x11, 0x2b, 0x50, 0xd5, 0x12, 0xdf,
+	0x2c, 0x35, 0xef, 0x71, 0x41, 0xb7, 0x97, 0x0a, 0x8a, 0xb4, 0x4a, 0x75, 0x93, 0x7f, 0x57, 0x14,
+	0xdf, 0x85, 0x57, 0x60, 0xb3, 0xd3, 0xef, 0xf5, 0x94, 0xce, 0x08, 0xe5, 0x70, 0x0d, 0xca, 0xe2,
+	0x45, 0x39, 0x40, 0x79, 0xf6, 0x3a, 0x3c, 0xda, 0x1f, 0x76, 0x48, 0x77, 0x5f, 0x41, 0x05, 0xfe,
+	0x89, 0x38, 0xe9, 0x1f, 0x1c, 0x75, 0x14, 0x12, 0x7c, 0x0e, 0x3e, 0x54, 0x7a, 0x07, 0x68, 0x03,
+	0x23, 0xa8, 0xb2, 0x27, 0x95, 0x28, 0x1d, 0xa5, 0x3b, 0x18, 0xa1, 0x4d, 0x76, 0xe2, 0xe1, 0x14,
+	0x85, 0x90, 0x3e, 0x41, 0x25, 0x36, 0xc8, 0x1b, 0x65, 0x38, 0x6c, 0xbf, 0x54, 0x50, 0x99, 0x1f,
+	0x75, 0x3a, 0xaf, 0x11, 0x30, 0x09, 0x2f, 0x0e, 0xfb, 0x3f, 0x47, 0x15, 0xdc, 0x80, 0xca, 0x51,
+	0x2f, 0x1e, 0xaa, 0xca, 0x6f, 0xb7, 0x8f, 0x3a, 0x1d, 0x65, 0x38, 0x44, 0x35, 0x5c, 0x86, 0x62,
+	0x20, 0xa8, 0xce, 0x8e, 0x4e, 0x9d, 0xc3, 0xfe, 0x50, 0x51, 0x23, 0x45, 0x1a, 0x31, 0xad, 0xd3,
+	0xef, 0x0d, 0x8f, 0xde, 0x28, 0x04, 0x21, 0xbc, 0x0d, 0x28, 0xe4, 0x50, 0x43, 0x41, 0x97, 0xd9,
+	0x80, 0x83, 0x6e, 0xef, 0x25, 0xc2, 0xfc, 0xa9, 0xdf, 0x7b, 0x89, 0xb6, 0xf0, 0x3d, 0xb8, 0x4d,
+	0x94, 0x03, 0xe5, 0xb0, 0xfb, 0x56, 0x21, 0xea, 0x51, 0xaf, 0xdd, 0x79, 0xdd, 0xeb, 0xff, 0xfc,
+	0x50, 0x39, 0x78, 0xa9, 0x1c, 0xa8, 0x42, 0xe7, 0x21, 0xda, 0xc6, 0x4d, 0xd8, 0x1e, 0xb4, 0xc9,
+	0xa8, 0x3b, 0xea, 0xf6, 0x7b, 0xbc, 0x65, 0xd4, 0x3e, 0x68, 0x8f, 0xda, 0xe8, 0x0a, 0xbe, 0x0d,
+	0x9f, 0x64, 0xb5, 0xa8, 0x44, 0x19, 0x0e, 0xfa, 0xbd, 0xa1, 0x82, 0xae, 0xf2, 0xef, 0x39, 0xfa,
+	0xfd, 0xd7, 0x47, 0x03, 0xf4, 0x11, 0xde, 0x82, 0x46, 0xf0, 0x1c, 0x33, 0x34, 0xf9, 0x14, 0x84,
+	0xf2, 0xea, 0x70, 0xd4, 0x1e, 0x0d, 0xd1, 0xc7, 0xf8, 0x3a, 0x7c, 0x94, 0xa6, 0xc5, 0x1d, 0xae,
+	0x31, 0x75, 0x88, 0xd2, 0xee, 0xbc, 0x52, 0x0e, 0x54, 0x66, 0xe7, 0xfe, 0x0b, 0x75, 0xd4, 0x1f,
+	0x74, 0x3b, 0xe8, 0x7a, 0xb0, 0x2c, 0xca, 0x6b, 0x74, 0x03, 0x7f, 0x04, 0x5b, 0x2f, 0x95, 0x91,
+	0x7a, 0xd8, 0x1e, 0x8e, 0xc2, 0x99, 0xa8, 0xdd, 0x03, 0xf4, 0x09, 0x6e, 0xc1, 0x8d, 0x8c, 0x86,
+	0x58, 0xfc, 0x4d, 0x7c, 0x0d, 0xae, 0xb6, 0x3b, 0xa3, 0xee, 0xdb, 0xd8, 0xa6, 0x6a, 0xe7, 0x55,
+	0xbb, 0xf7, 0x52, 0x41, 0xb7, 0x98, 0x5e, 0xac, 0x37, 0x1f, 0x6f, 0xc8, 0x46, 0xee, 0xb5, 0xdf,
+	0x28, 0xc3, 0x41, 0xbb, 0xa3, 0xa0, 0x16, 0xbe, 0x0b, 0xad, 0x25, 0x8d, 0xb1, 0xf8, 0xdb, 0x0c,
+	0x1e, 0x8c, 0x6b, 0xd8, 0x79, 0xa5, 0xbc, 0x69, 0x23, 0x39, 0xd4, 0x34, 0x78, 0x8f, 0x19, 0xef,
+	0x30, 0xbb, 0xb4, 0x8f, 0x46, 0xaf, 0xd8, 0xe0, 0x87, 0x87, 0x0a, 0x1b, 0xff, 0x2e, 0xbe, 0x0c,
+	0x35, 0x4e, 0x8b, 0xd8, 0xee, 0x3d, 0x38, 0xe0, 0xb7, 0xc1, 0xc9, 0xcf, 0xae, 0xf9, 0xdf, 0x1d,
+	0xfa, 0x3d, 0x05, 0x5d, 0x62, 0x70, 0x3b, 0xfc, 0xe6, 0x69, 0xf0, 0x5f, 0x87, 0x6f, 0x0e, 0xbb,
+	0xfb, 0x28, 0xc7, 0x9f, 0x86, 0x23, 0x86, 0x70, 0x80, 0x8d, 0x61, 0xaf, 0x3d, 0x18, 0x7c, 0x8d,
+	0x0a, 0x0f, 0xfe, 0x90, 0x87, 0x4a, 0xe2, 0x40, 0xc9, 0x60, 0x7d, 0x64, 0xb3, 0xbc, 0x47, 0xdc,
+	0x8e, 0x5c, 0x62, 0x43, 0x87, 0x39, 0x43, 0xe2, 0xda, 0x65, 0xc0, 0xce, 0x7e, 0x9e, 0x4f, 0x6d,
+	0x5d, 0xdc, 0xad, 0xe4, 0xd8, 0x84, 0xd8, 0xde, 0xa3, 0xb6, 0x6f, 0xea, 0xf1, 0xdd, 0x0e, 0xca,
+	0xe3, 0xab, 0x80, 0xdb, 0xc1, 0x5d, 0xfc, 0x77, 0x09, 0x7a, 0x81, 0x8d, 0x15, 0xfa, 0xe6, 0xfd,
+	0x99, 0x77, 0x8e, 0x8a, 0x0c, 0x27, 0xe2, 0x96, 0xbc, 0xe7, 0xf8, 0x84, 0x6a, 0xc6, 0x39, 0xda,
+	0x60, 0x60, 0x0d, 0x93, 0xce, 0xfd, 0xa0, 0x4e, 0xf5, 0xd5, 0xcc, 0xf1, 0x35, 0xe5, 0xbd, 0x4e,
+	0xa9, 0x41, 0x83, 0x1c, 0x1b, 0x6d, 0xe2, 0x4f, 0xe1, 0xde, 0x4a, 0xb6, 0xf7, 0x3a, 0x0d, 0xae,
+	0x93, 0x4a, 0x6c, 0x4a, 0xe1, 0xb5, 0x51, 0xd0, 0xbb, 0xcc, 0x16, 0x98, 0x1d, 0x11, 0xa6, 0x53,
+	0xc7, 0xf5, 0xa9, 0x21, 0x4e, 0xb6, 0x41, 0x23, 0x30, 0x7e, 0xee, 0x79, 0x7b, 0x8e, 0xff, 0xc2,
+	0x99, 0xd9, 0x06, 0xaa, 0x30, 0x2c, 0x26, 0xbf, 0x8c, 0x8a, 0x5a, 0xaa, 0xfc, 0x4e, 0x2a, 0x2c,
+	0xec, 0x85, 0xd4, 0x1a, 0x9b, 0xd9, 0xc8, 0x71, 0xde, 0x68, 0xf6, 0x39, 0x09, 0xce, 0xfa, 0x1e,
+	0xaa, 0x33, 0x21, 0x5c, 0xee, 0x88, 0xba, 0x13, 0xd3, 0xd6, 0xfc, 0x70, 0x32, 0x0d, 0x66, 0x9a,
+	0x68, 0x32, 0xcc, 0x34, 0x7c, 0x73, 0x77, 0x6d, 0x7e, 0x65, 0x17, 0xa8, 0xa2, 0x4d, 0x28, 0xba,
+	0xcc, 0x4c, 0xdb, 0xe5, 0x17, 0x67, 0x9a, 0x6f, 0x1e, 0x5b, 0x34, 0x70, 0xc0, 0x08, 0xb3, 0xb5,
+	0x08, 0x95, 0x68, 0x7b, 0x9e, 0x39, 0x16, 0x53, 0xd9, 0x7a, 0xf0, 0x1a, 0x20, 0xfe, 0x5c, 0x84,
+	0x43, 0x2d, 0xfe, 0x54, 0x31, 0xf8, 0xa7, 0xcc, 0x16, 0x34, 0x62, 0xda, 0xd7, 0xba, 0xf6, 0xf6,
+	0x71, 0xb0, 0xe2, 0x31, 0xb1, 0xcd, 0x16, 0xd9, 0x43, 0xb9, 0x07, 0x7f, 0x2b, 0x41, 0x63, 0x30,
+	0xf7, 0x41, 0xea, 0x06, 0xe4, 0xce, 0x1e, 0xa1, 0x4b, 0xfc, 0x97, 0xf5, 0x64, 0xbf, 0x7b, 0x28,
+	0xc7, 0x7f, 0x9f, 0xa0, 0x3c, 0xff, 0x7d, 0x8a, 0x0a, 0xfc, 0xf7, 0x27, 0xa8, 0xc8, 0x7f, 0x9f,
+	0xa1, 0x0d, 0xfe, 0xfb, 0xa7, 0x68, 0x93, 0xff, 0x7e, 0x8e, 0x4a, 0xfc, 0xf7, 0x8b, 0xc0, 0x71,
+	0x9e, 0x3d, 0x7e, 0x84, 0x20, 0x78, 0x78, 0x8c, 0x2a, 0xc1, 0xc3, 0x1e, 0xaa, 0x06, 0x0f, 0x4f,
+	0x50, 0x2d, 0x78, 0x78, 0x8a, 0xea, 0xfb, 0xf7, 0x41, 0x76, 0xdc, 0xf1, 0xae, 0x36, 0x65, 0x29,
+	0x54, 0x18, 0x1d, 0x74, 0x67, 0x32, 0x71, 0xec, 0x5d, 0x2d, 0xfc, 0x4f, 0xd3, 0xab, 0xfc, 0xff,
+	0x05, 0x00, 0x00, 0xff, 0xff, 0x47, 0xe6, 0x3b, 0xba, 0xe7, 0x34, 0x00, 0x00,
 }
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index d45253d..de190e0 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -42,6 +42,10 @@
 	// Failover subscription mode, multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages.
 	// If that consumer disconnects, one of the other connected consumers will start receiving messages.
 	Failover
+
+	// KeyShared subscription mode, multiple consumer will be able to use the same subscription and all messages with the same key
+	// will be dispatched to only one consumer
+	KeyShared
 )
 
 type InitialPosition int
@@ -135,6 +139,9 @@
 	// This calls blocks until a message is available.
 	Receive(context.Context) (Message, error)
 
+	// ReceiveAsync appends the message to the msgs channel asynchronously.
+	ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error
+
 	// Ack the consumption of a single message
 	Ack(Message) error
 
@@ -170,5 +177,5 @@
 	// active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
 	// the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
 	// breaks, the messages are redelivered after reconnect.
-	RedeliverUnackedMessages()
+	RedeliverUnackedMessages() error
 }
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
new file mode 100644
index 0000000..8eeb784
--- /dev/null
+++ b/pulsar/consumer_test.go
@@ -0,0 +1,341 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+	"context"
+	"fmt"
+	"github.com/stretchr/testify/assert"
+	"log"
+	"net/http"
+	"strings"
+	"testing"
+	"time"
+)
+
+var (
+	adminURL  = "http://localhost:8080"
+	lookupURL = "pulsar://localhost:6650"
+)
+
+func TestProducerConsumer(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := "my-topic"
+	ctx := context.Background()
+
+	// create consumer
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "my-sub",
+		Type:             Shared,
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: false,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	// send 10 messages
+	for i := 0; i < 10; i++ {
+		if err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		}); err != nil {
+			log.Fatal(err)
+		}
+	}
+
+	// receive 10 messages
+	for i := 0; i < 10; i++ {
+		msg, err := consumer.Receive(context.Background())
+		if err != nil {
+			log.Fatal(err)
+		}
+
+		expectMsg := fmt.Sprintf("hello-%d", i)
+		assert.Equal(t, []byte(expectMsg), msg.Payload())
+
+		// ack message
+		if err := consumer.Ack(msg); err != nil {
+			log.Fatal(err)
+		}
+	}
+
+	// unsubscribe consumer
+	if err := consumer.Unsubscribe(); err != nil {
+		log.Fatal(err)
+	}
+}
+
+func TestConsumerConnectError(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://invalid-hostname:6650",
+	})
+
+	assert.Nil(t, err)
+
+	defer client.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            "my-topic",
+		SubscriptionName: "my-subscription",
+	})
+
+	// Expect error in creating consumer
+	assert.Nil(t, consumer)
+	assert.NotNil(t, err)
+
+	assert.Equal(t, err.Error(), "connection error")
+}
+
+func TestConsumerWithInvalidConf(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	if err != nil {
+		t.Fatal(err)
+		return
+	}
+
+	defer client.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic: "my-topic",
+	})
+
+	// Expect error in creating cosnumer
+	assert.Nil(t, consumer)
+	assert.NotNil(t, err)
+
+	fmt.Println(err.Error())
+	assert.Equal(t, err.(*Error).Result(), SubscriptionNotFound)
+
+	consumer, err = client.Subscribe(ConsumerOptions{
+		SubscriptionName: "my-subscription",
+	})
+
+	// Expect error in creating consumer
+	assert.Nil(t, consumer)
+	assert.NotNil(t, err)
+
+	assert.Equal(t, err.(*Error).Result(), TopicNotFound)
+}
+
+func TestConsumer_SubscriptionEarliestPos(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := fmt.Sprintf("testSeek-%d", time.Now().Unix())
+	subName := "test-subscription-initial-earliest-position"
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	//sent message
+	ctx := context.Background()
+
+	err = producer.Send(ctx, &ProducerMessage{
+		Payload: []byte("msg-1-content-1"),
+	})
+	assert.Nil(t, err)
+
+	err = producer.Send(ctx, &ProducerMessage{
+		Payload: []byte("msg-1-content-2"),
+	})
+	assert.Nil(t, err)
+
+	// create consumer
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:               topicName,
+		SubscriptionName:    subName,
+		SubscriptionInitPos: Earliest,
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	msg, err := consumer.Receive(ctx)
+	assert.Nil(t, err)
+
+	assert.Equal(t, "msg-1-content-1", string(msg.Payload()))
+}
+
+func makeHTTPCall(t *testing.T, method string, urls string, body string) {
+	client := http.Client{}
+
+	req, err := http.NewRequest(method, urls, strings.NewReader(body))
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	req.Header.Set("Content-Type", "application/json")
+	req.Header.Set("Accept", "application/json")
+
+	res, err := client.Do(req)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer res.Body.Close()
+}
+
+func TestConsumerShared(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := "persistent://public/default/test-topic-6"
+
+	consumer1, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "sub-1",
+		Type:             KeyShared,
+	})
+	assert.Nil(t, err)
+	defer consumer1.Close()
+
+	consumer2, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "sub-1",
+		Type:             KeyShared,
+	})
+	assert.Nil(t, err)
+	defer consumer2.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topic,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	ctx := context.Background()
+	for i := 0; i < 10; i++ {
+		err := producer.Send(ctx, &ProducerMessage{
+			Key:     fmt.Sprintf("key-shared-%d", i%3),
+			Payload: []byte(fmt.Sprintf("value-%d", i)),
+		})
+		assert.Nil(t, err)
+	}
+
+	time.Sleep(time.Second * 5)
+
+	go func() {
+		for i := 0; i < 10; i++ {
+			msg, err := consumer1.Receive(ctx)
+			assert.Nil(t, err)
+			if msg != nil {
+				fmt.Printf("consumer1 key is: %s, value is: %s\n", msg.Key(), string(msg.Payload()))
+				err = consumer1.Ack(msg)
+				assert.Nil(t, err)
+			}
+		}
+	}()
+
+	go func() {
+		for i := 0; i < 10; i++ {
+			msg2, err := consumer2.Receive(ctx)
+			assert.Nil(t, err)
+			if msg2 != nil {
+				fmt.Printf("consumer2 key is:%s, value is: %s\n", msg2.Key(), string(msg2.Payload()))
+				err = consumer2.Ack(msg2)
+				assert.Nil(t, err)
+			}
+		}
+	}()
+}
+
+func TestPartitionTopicsConsumerPubSub(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := "persistent://public/default/testGetPartitions"
+	testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions/partitions"
+
+	makeHTTPCall(t, http.MethodPut, testURL, "3")
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topic,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	topics, err := client.TopicPartitions(topic)
+	assert.Nil(t, err)
+	assert.Equal(t, topic+"-partition-0", topics[0])
+	assert.Equal(t, topic+"-partition-1", topics[1])
+	assert.Equal(t, topic+"-partition-2", topics[2])
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "my-sub",
+		Type:             Exclusive,
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	ctx := context.Background()
+	for i := 0; i < 10; i++ {
+		err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		})
+		assert.Nil(t, err)
+	}
+
+	msgs := make([]string, 0)
+
+	for i := 0; i < 10; i++ {
+		msg, err := consumer.Receive(ctx)
+		assert.Nil(t, err)
+		msgs = append(msgs, string(msg.Payload()))
+
+		fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
+			msg.ID(), string(msg.Payload()))
+
+		if err := consumer.Ack(msg); err != nil {
+			assert.Nil(t, err)
+		}
+	}
+
+	assert.Equal(t, len(msgs), 10)
+}
diff --git a/pulsar/error.go b/pulsar/error.go
index 8b73a95..0231913 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -59,8 +59,8 @@
 	//ProducerBlockedQuotaExceededException Result = 25 // Producer is getting exception
 	//ProducerQueueIsFull                   Result = 26 // Producer queue is full
 	//MessageTooBig                         Result = 27 // Trying to send a messages exceeding the max size
-	//TopicNotFound                         Result = 28 // Topic not found
-	//SubscriptionNotFound                  Result = 29 // Subscription not found
+	TopicNotFound        Result = 28 // Topic not found
+	SubscriptionNotFound Result = 29 // Subscription not found
 	//ConsumerNotFound                      Result = 30 // Consumer not found
 	//UnsupportedVersionError               Result = 31 // Error when an older client/version doesn't support a required feature
 	//TopicTerminated                       Result = 32 // Topic was already terminated
diff --git a/pulsar/impl_client.go b/pulsar/impl_client.go
index b73080b..aeda1c0 100644
--- a/pulsar/impl_client.go
+++ b/pulsar/impl_client.go
@@ -96,8 +96,12 @@
 }
 
 func (client *client) Subscribe(options ConsumerOptions) (Consumer, error) {
-	// TODO: Implement consumer
-	return nil, nil
+	consumer, err := newConsumer(client, &options)
+	if err != nil {
+		return nil, err
+	}
+	client.handlers[consumer] = true
+	return consumer, nil
 }
 
 func (client *client) CreateReader(options ReaderOptions) (Reader, error) {
diff --git a/pulsar/impl_consumer.go b/pulsar/impl_consumer.go
new file mode 100644
index 0000000..0a44971
--- /dev/null
+++ b/pulsar/impl_consumer.go
@@ -0,0 +1,256 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+	"context"
+	"errors"
+	"fmt"
+
+	"github.com/apache/pulsar-client-go/pkg/pb"
+	"github.com/apache/pulsar-client-go/util"
+	"github.com/golang/protobuf/proto"
+
+	log "github.com/sirupsen/logrus"
+)
+
+type consumer struct {
+	topicName       string
+	consumers       []Consumer
+	log             *log.Entry
+	queue           chan ConsumerMessage
+	unackMsgTracker *UnackedMessageTracker
+}
+
+func newConsumer(client *client, options *ConsumerOptions) (*consumer, error) {
+	if options == nil {
+		return nil, newError(ResultInvalidConfiguration, "consumer configuration undefined")
+	}
+
+	if options.Topic == "" && options.Topics == nil && options.TopicsPattern == "" {
+		return nil, newError(TopicNotFound, "topic is required")
+	}
+
+	if options.SubscriptionName == "" {
+		return nil, newError(SubscriptionNotFound, "subscription name is required for consumer")
+	}
+
+	if options.ReceiverQueueSize == 0 {
+		options.ReceiverQueueSize = 1000
+	}
+
+	if options.TopicsPattern != "" {
+		if options.Topics != nil {
+			return nil, newError(ResultInvalidConfiguration, "Topic names list must be null when use topicsPattern")
+		}
+		// TODO: impl logic
+	} else if options.Topics != nil && len(options.Topics) > 1 {
+		// TODO: impl logic
+	} else if options.Topics != nil && len(options.Topics) == 1 || options.Topic != "" {
+		var singleTopicName string
+		if options.Topic != "" {
+			singleTopicName = options.Topic
+		} else {
+			singleTopicName = options.Topics[0]
+		}
+		return singleTopicSubscribe(client, options, singleTopicName)
+	}
+
+	return nil, newError(ResultInvalidTopicName, "topic name is required for consumer")
+}
+
+func singleTopicSubscribe(client *client, options *ConsumerOptions, topic string) (*consumer, error) {
+	c := &consumer{
+		topicName: topic,
+		queue:     make(chan ConsumerMessage, options.ReceiverQueueSize),
+	}
+
+	partitions, err := client.TopicPartitions(topic)
+	if err != nil {
+		return nil, err
+	}
+
+	numPartitions := len(partitions)
+	c.consumers = make([]Consumer, numPartitions)
+
+	type ConsumerError struct {
+		err       error
+		partition int
+		cons      Consumer
+	}
+
+	ch := make(chan ConsumerError, numPartitions)
+
+	for partitionIdx, partitionTopic := range partitions {
+		go func(partitionIdx int, partitionTopic string) {
+			cons, err := newPartitionConsumer(client, partitionTopic, options, partitionIdx)
+			ch <- ConsumerError{
+				err:       err,
+				partition: partitionIdx,
+				cons:      cons,
+			}
+		}(partitionIdx, partitionTopic)
+	}
+
+	for i := 0; i < numPartitions; i++ {
+		ce, ok := <-ch
+		if ok {
+			err = ce.err
+			c.consumers[ce.partition] = ce.cons
+		}
+	}
+
+	if err != nil {
+		// Since there were some failures, cleanup all the partitions that succeeded in creating the consumers
+		for _, consumer := range c.consumers {
+			if !util.IsNil(consumer) {
+				if err = consumer.Close(); err != nil {
+					panic("close consumer error, please check.")
+				}
+			}
+		}
+		return nil, err
+	}
+
+	return c, nil
+}
+
+func (c *consumer) Topic() string {
+	return c.topicName
+}
+
+func (c *consumer) Subscription() string {
+	return c.consumers[0].Subscription()
+}
+
+func (c *consumer) Unsubscribe() error {
+	var errMsg string
+	for _, c := range c.consumers {
+		if err := c.Unsubscribe(); err != nil {
+			errMsg += fmt.Sprintf("topic %s, subscription %s: %s", c.Topic(), c.Subscription(), err)
+		}
+	}
+	if errMsg != "" {
+		return errors.New(errMsg)
+	}
+	return nil
+}
+
+func (c *consumer) Receive(ctx context.Context) (Message, error) {
+	for _, pc := range c.consumers {
+		go func(pc Consumer) {
+			if err := pc.ReceiveAsync(ctx, c.queue); err != nil {
+				return
+			}
+		}(pc)
+	}
+
+	select {
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	case msg, ok := <-c.queue:
+		if ok {
+			return msg.Message, nil
+		}
+		return nil, errors.New("receive message error")
+	}
+}
+
+func (c *consumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error {
+	//TODO: impl logic
+	return nil
+}
+
+//Ack the consumption of a single message
+func (c *consumer) Ack(msg Message) error {
+	return c.AckID(msg.ID())
+}
+
+// Ack the consumption of a single message, identified by its MessageID
+func (c *consumer) AckID(msgID MessageID) error {
+	id := &pb.MessageIdData{}
+	err := proto.Unmarshal(msgID.Serialize(), id)
+	if err != nil {
+		c.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
+		return err
+	}
+
+	partition := id.GetPartition()
+	if partition < 0 {
+		return c.consumers[0].AckID(msgID)
+	}
+	return c.consumers[partition].AckID(msgID)
+}
+
+func (c *consumer) AckCumulative(msg Message) error {
+	return c.AckCumulativeID(msg.ID())
+}
+
+func (c *consumer) AckCumulativeID(msgID MessageID) error {
+	id := &pb.MessageIdData{}
+	err := proto.Unmarshal(msgID.Serialize(), id)
+	if err != nil {
+		c.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
+		return err
+	}
+
+	partition := id.GetPartition()
+	if partition < 0 {
+		return errors.New("invalid partition index")
+	}
+	return c.consumers[partition].AckCumulativeID(msgID)
+}
+
+func (c *consumer) Close() error {
+	for _, pc := range c.consumers {
+		return pc.Close()
+	}
+	return nil
+}
+
+func (c *consumer) Seek(msgID MessageID) error {
+	id := &pb.MessageIdData{}
+	err := proto.Unmarshal(msgID.Serialize(), id)
+	if err != nil {
+		c.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
+		return err
+	}
+
+	partition := id.GetPartition()
+
+	if partition < 0 {
+		return errors.New("invalid partition index")
+	}
+	return c.consumers[partition].Seek(msgID)
+}
+
+func (c *consumer) RedeliverUnackedMessages() error {
+	var errMsg string
+	for _, c := range c.consumers {
+		if err := c.RedeliverUnackedMessages(); err != nil {
+			errMsg += fmt.Sprintf("topic %s, subscription %s: %s", c.Topic(), c.Subscription(), err)
+		}
+	}
+
+	if errMsg != "" {
+		return errors.New(errMsg)
+	}
+	return nil
+}
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index a7b1207..38d372a 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -18,10 +18,16 @@
 package pulsar
 
 import (
+	"time"
+
 	"github.com/apache/pulsar-client-go/pkg/pb"
 	"github.com/golang/protobuf/proto"
 )
 
+func earliestMessageID() MessageID {
+	return newMessageID(-1, -1, -1, -1)
+}
+
 type messageID struct {
 	ledgerID     int64
 	entryID      int64
@@ -64,12 +70,59 @@
 	return id, nil
 }
 
-func earliestMessageID() MessageID {
-	return newMessageID(-1, -1, -1, -1)
-}
-
 const maxLong int64 = 0x7fffffffffffffff
 
 func latestMessageID() MessageID {
 	return newMessageID(maxLong, maxLong, -1, -1)
 }
+
+func timeFromUnixTimestampMillis(timestamp uint64) time.Time {
+	ts := int64(timestamp) * int64(time.Millisecond)
+	seconds := ts / int64(time.Second)
+	nanos := ts - (seconds * int64(time.Second))
+	return time.Unix(seconds, nanos)
+}
+
+func timeToUnixTimestampMillis(t time.Time) uint64 {
+	nanos := t.UnixNano()
+	millis := nanos / int64(time.Millisecond)
+	return uint64(millis)
+}
+
+type message struct {
+	publishTime time.Time
+	eventTime   time.Time
+	key         string
+	payLoad     []byte
+	msgID       MessageID
+	properties  map[string]string
+	topic       string
+}
+
+func (msg *message) Topic() string {
+	return msg.topic
+}
+
+func (msg *message) Properties() map[string]string {
+	return msg.properties
+}
+
+func (msg *message) Payload() []byte {
+	return msg.payLoad
+}
+
+func (msg *message) ID() MessageID {
+	return msg.msgID
+}
+
+func (msg *message) PublishTime() time.Time {
+	return msg.publishTime
+}
+
+func (msg *message) EventTime() time.Time {
+	return msg.eventTime
+}
+
+func (msg *message) Key() string {
+	return msg.key
+}
diff --git a/pulsar/impl_partition_consumer.go b/pulsar/impl_partition_consumer.go
new file mode 100644
index 0000000..0d7069f
--- /dev/null
+++ b/pulsar/impl_partition_consumer.go
@@ -0,0 +1,688 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"context"
+	"fmt"
+	"math"
+	"sync"
+	"time"
+
+	"github.com/apache/pulsar-client-go/pkg/pb"
+	"github.com/apache/pulsar-client-go/pulsar/internal"
+	"github.com/apache/pulsar-client-go/util"
+	"github.com/golang/protobuf/proto"
+
+	log "github.com/sirupsen/logrus"
+)
+
+const maxRedeliverUnacknowledged = 1000
+
+type consumerState int
+
+const (
+	consumerInit = iota
+	consumerReady
+	consumerClosing
+	consumerClosed
+)
+
+var (
+	subType  pb.CommandSubscribe_SubType
+	position pb.CommandSubscribe_InitialPosition
+)
+
+type partitionConsumer struct {
+	state  consumerState
+	client *client
+	topic  string
+	log    *log.Entry
+	cnx    internal.Connection
+
+	options      *ConsumerOptions
+	consumerName *string
+	consumerID   uint64
+	subQueue     chan ConsumerMessage
+
+	omu      sync.Mutex // protects following
+	overflow []*pb.MessageIdData
+
+	unAckTracker *UnackedMessageTracker
+
+	eventsChan   chan interface{}
+	partitionIdx int
+}
+
+func newPartitionConsumer(client *client, topic string, options *ConsumerOptions, partitionID int) (*partitionConsumer, error) {
+	c := &partitionConsumer{
+		state:        consumerInit,
+		client:       client,
+		topic:        topic,
+		options:      options,
+		log:          log.WithField("topic", topic),
+		consumerID:   client.rpcClient.NewConsumerID(),
+		partitionIdx: partitionID,
+		eventsChan:   make(chan interface{}),
+		subQueue:     make(chan ConsumerMessage, options.ReceiverQueueSize),
+	}
+
+	c.setDefault(options)
+
+	if options.MessageChannel == nil {
+		options.MessageChannel = make(chan ConsumerMessage, options.ReceiverQueueSize)
+	} else {
+		c.subQueue = options.MessageChannel
+	}
+
+	if options.Name != "" {
+		c.consumerName = &options.Name
+	}
+
+	switch options.Type {
+	case Exclusive:
+		subType = pb.CommandSubscribe_Exclusive
+	case Failover:
+		subType = pb.CommandSubscribe_Failover
+	case Shared:
+		subType = pb.CommandSubscribe_Shared
+	case KeyShared:
+		subType = pb.CommandSubscribe_Key_Shared
+	}
+
+	if options.Type == Shared || options.Type == KeyShared {
+		if options.AckTimeout != 0 {
+			c.unAckTracker = NewUnackedMessageTracker()
+			c.unAckTracker.pc = c
+			c.unAckTracker.Start(int64(options.AckTimeout))
+		}
+	}
+
+	switch options.SubscriptionInitPos {
+	case Latest:
+		position = pb.CommandSubscribe_Latest
+	case Earliest:
+		position = pb.CommandSubscribe_Earliest
+	}
+
+	err := c.grabCnx()
+	if err != nil {
+		log.WithError(err).Errorf("Failed to create consumer")
+		return nil, err
+	}
+	c.log = c.log.WithField("name", c.consumerName)
+	c.log.Info("Created consumer")
+	c.state = consumerReady
+	go c.runEventsLoop()
+
+	return c, nil
+}
+
+func (pc *partitionConsumer) setDefault(options *ConsumerOptions) {
+	if options.ReceiverQueueSize <= 0 {
+		options.ReceiverQueueSize = 1000
+	}
+
+	if options.AckTimeout == 0 {
+		options.AckTimeout = time.Second * 30
+	}
+
+	position = pb.CommandSubscribe_Latest
+	subType = pb.CommandSubscribe_Exclusive
+}
+
+func (pc *partitionConsumer) grabCnx() error {
+	lr, err := pc.client.lookupService.Lookup(pc.topic)
+	if err != nil {
+		pc.log.WithError(err).Warn("Failed to lookup topic")
+		return err
+	}
+
+	pc.log.Debug("Lookup result: ", lr)
+	requestID := pc.client.rpcClient.NewRequestID()
+	res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
+		pb.BaseCommand_SUBSCRIBE, &pb.CommandSubscribe{
+			RequestId:       proto.Uint64(requestID),
+			Topic:           proto.String(pc.topic),
+			SubType:         subType.Enum(),
+			Subscription:    proto.String(pc.options.SubscriptionName),
+			ConsumerId:      proto.Uint64(pc.consumerID),
+			ConsumerName:    proto.String(pc.options.Name),
+			InitialPosition: position.Enum(),
+			Schema:          nil,
+		})
+
+	if err != nil {
+		pc.log.WithError(err).Error("Failed to create consumer")
+		return err
+	}
+
+	if res.Response.ConsumerStatsResponse != nil {
+		pc.consumerName = res.Response.ConsumerStatsResponse.ConsumerName
+	}
+
+	pc.cnx = res.Cnx
+	pc.log.WithField("cnx", res.Cnx).Debug("Connected consumer")
+
+	msgType := res.Response.GetType()
+
+	switch msgType {
+	case pb.BaseCommand_SUCCESS:
+		pc.cnx.AddConsumeHandler(pc.consumerID, pc)
+		if err := pc.internalFlow(uint32(pc.options.ReceiverQueueSize)); err != nil {
+			return err
+		}
+		return nil
+	case pb.BaseCommand_ERROR:
+		errMsg := res.Response.GetError()
+		return fmt.Errorf("%s: %s", errMsg.GetError().String(), errMsg.GetMessage())
+	default:
+		return util.NewUnexpectedErrMsg(msgType, requestID)
+	}
+}
+
+func (pc *partitionConsumer) Topic() string {
+	return pc.topic
+}
+
+func (pc *partitionConsumer) Subscription() string {
+	return pc.options.SubscriptionName
+}
+
+func (pc *partitionConsumer) Unsubscribe() error {
+	wg := &sync.WaitGroup{}
+	wg.Add(1)
+
+	hu := &handleUnsubscribe{
+		waitGroup: wg,
+		err:       nil,
+	}
+	pc.eventsChan <- hu
+
+	wg.Wait()
+	return hu.err
+}
+
+func (pc *partitionConsumer) internalUnsubscribe(unsub *handleUnsubscribe) {
+	requestID := pc.client.rpcClient.NewRequestID()
+	_, err := pc.client.rpcClient.RequestOnCnx(pc.cnx, requestID,
+		pb.BaseCommand_UNSUBSCRIBE, &pb.CommandUnsubscribe{
+			RequestId:  proto.Uint64(requestID),
+			ConsumerId: proto.Uint64(pc.consumerID),
+		})
+	if err != nil {
+		pc.log.WithError(err).Error("Failed to unsubscribe consumer")
+		unsub.err = err
+	}
+
+	pc.cnx.DeleteConsumeHandler(pc.consumerID)
+	if pc.unAckTracker != nil {
+		pc.unAckTracker.Stop()
+	}
+
+	unsub.waitGroup.Done()
+}
+
+func (pc *partitionConsumer) Receive(ctx context.Context) (Message, error) {
+	select {
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	case cm, ok := <-pc.subQueue:
+		if ok {
+			id := &pb.MessageIdData{}
+			err := proto.Unmarshal(cm.ID().Serialize(), id)
+			if err != nil {
+				pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
+				return nil, err
+			}
+			if pc.unAckTracker != nil {
+				pc.unAckTracker.Add(id)
+			}
+			return cm.Message, nil
+		}
+		return nil, newError(ResultConnectError, "receive queue closed")
+	}
+}
+
+func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error {
+	highwater := uint32(math.Max(float64(cap(pc.options.MessageChannel)/2), 1))
+
+	// request half the buffer's capacity
+	if err := pc.internalFlow(highwater); err != nil {
+		pc.log.Errorf("Send Flow cmd error:%s", err.Error())
+		return err
+	}
+	var receivedSinceFlow uint32
+
+	for {
+		select {
+		case tmpMsg, ok := <-pc.subQueue:
+			if ok {
+				msgs <- tmpMsg
+				id := &pb.MessageIdData{}
+				err := proto.Unmarshal(tmpMsg.ID().Serialize(), id)
+				if err != nil {
+					pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
+					return err
+				}
+				if pc.unAckTracker != nil {
+					pc.unAckTracker.Add(id)
+				}
+				receivedSinceFlow++
+				if receivedSinceFlow >= highwater {
+					if err := pc.internalFlow(receivedSinceFlow); err != nil {
+						pc.log.Errorf("Send Flow cmd error:%s", err.Error())
+						return err
+					}
+					receivedSinceFlow = 0
+				}
+				continue
+			}
+		case <-ctx.Done():
+			return ctx.Err()
+		}
+	}
+
+}
+
+func (pc *partitionConsumer) Ack(msg Message) error {
+	return pc.AckID(msg.ID())
+}
+
+func (pc *partitionConsumer) AckID(msgID MessageID) error {
+	wg := &sync.WaitGroup{}
+	wg.Add(1)
+	ha := &handleAck{
+		msgID:     msgID,
+		waitGroup: wg,
+		err:       nil,
+	}
+	pc.eventsChan <- ha
+	wg.Wait()
+	return ha.err
+}
+
+func (pc *partitionConsumer) internalAck(ack *handleAck) {
+	id := &pb.MessageIdData{}
+	messageIDs := make([]*pb.MessageIdData, 0)
+	err := proto.Unmarshal(ack.msgID.Serialize(), id)
+	if err != nil {
+		pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
+		ack.err = err
+	}
+
+	messageIDs = append(messageIDs, id)
+
+	requestID := pc.client.rpcClient.NewRequestID()
+	_, err = pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID,
+		pb.BaseCommand_ACK, &pb.CommandAck{
+			ConsumerId: proto.Uint64(pc.consumerID),
+			MessageId:  messageIDs,
+			AckType:    pb.CommandAck_Individual.Enum(),
+		})
+	if err != nil {
+		pc.log.WithError(err).Error("Failed to unsubscribe consumer")
+		ack.err = err
+	}
+
+	if pc.unAckTracker != nil {
+		pc.unAckTracker.Remove(id)
+	}
+	ack.waitGroup.Done()
+}
+
+func (pc *partitionConsumer) AckCumulative(msg Message) error {
+	return pc.AckCumulativeID(msg.ID())
+}
+
+func (pc *partitionConsumer) AckCumulativeID(msgID MessageID) error {
+	hac := &handleAckCumulative{
+		msgID: msgID,
+		err:   nil,
+	}
+	pc.eventsChan <- hac
+
+	return hac.err
+}
+
+func (pc *partitionConsumer) internalAckCumulative(ackCumulative *handleAckCumulative) {
+	id := &pb.MessageIdData{}
+	messageIDs := make([]*pb.MessageIdData, 0)
+	err := proto.Unmarshal(ackCumulative.msgID.Serialize(), id)
+	if err != nil {
+		pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
+		ackCumulative.err = err
+	}
+	messageIDs = append(messageIDs, id)
+
+	requestID := pc.client.rpcClient.NewRequestID()
+	_, err = pc.client.rpcClient.RequestOnCnx(pc.cnx, requestID,
+		pb.BaseCommand_ACK, &pb.CommandAck{
+			ConsumerId: proto.Uint64(pc.consumerID),
+			MessageId:  messageIDs,
+			AckType:    pb.CommandAck_Cumulative.Enum(),
+		})
+	if err != nil {
+		pc.log.WithError(err).Error("Failed to unsubscribe consumer")
+		ackCumulative.err = err
+	}
+
+	if pc.unAckTracker != nil {
+		pc.unAckTracker.Remove(id)
+	}
+}
+
+func (pc *partitionConsumer) Close() error {
+	if pc.state != consumerReady {
+		return nil
+	}
+	if pc.unAckTracker != nil {
+		pc.unAckTracker.Stop()
+	}
+
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+
+	cc := &handlerClose{&wg, nil}
+	pc.eventsChan <- cc
+
+	wg.Wait()
+	return cc.err
+}
+
+func (pc *partitionConsumer) Seek(msgID MessageID) error {
+	wg := &sync.WaitGroup{}
+	wg.Add(1)
+
+	hc := &handleSeek{
+		msgID:     msgID,
+		waitGroup: wg,
+		err:       nil,
+	}
+	pc.eventsChan <- hc
+
+	wg.Wait()
+	return hc.err
+}
+
+func (pc *partitionConsumer) internalSeek(seek *handleSeek) {
+	if pc.state == consumerClosing || pc.state == consumerClosed {
+		pc.log.Error("Consumer was already closed")
+		return
+	}
+
+	id := &pb.MessageIdData{}
+	err := proto.Unmarshal(seek.msgID.Serialize(), id)
+	if err != nil {
+		pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
+		seek.err = err
+	}
+
+	requestID := pc.client.rpcClient.NewRequestID()
+	_, err = pc.client.rpcClient.RequestOnCnx(pc.cnx, requestID,
+		pb.BaseCommand_SEEK, &pb.CommandSeek{
+			ConsumerId: proto.Uint64(pc.consumerID),
+			RequestId:  proto.Uint64(requestID),
+			MessageId:  id,
+		})
+	if err != nil {
+		pc.log.WithError(err).Error("Failed to unsubscribe consumer")
+		seek.err = err
+	}
+
+	seek.waitGroup.Done()
+}
+
+func (pc *partitionConsumer) RedeliverUnackedMessages() error {
+	wg := &sync.WaitGroup{}
+	wg.Add(1)
+
+	hr := &handleRedeliver{
+		waitGroup: wg,
+		err:       nil,
+	}
+	pc.eventsChan <- hr
+	wg.Wait()
+	return hr.err
+}
+
+func (pc *partitionConsumer) internalRedeliver(redeliver *handleRedeliver) {
+	pc.omu.Lock()
+	defer pc.omu.Unlock()
+
+	overFlowSize := len(pc.overflow)
+
+	if overFlowSize == 0 {
+		return
+	}
+
+	requestID := pc.client.rpcClient.NewRequestID()
+
+	for i := 0; i < len(pc.overflow); i += maxRedeliverUnacknowledged {
+		end := i + maxRedeliverUnacknowledged
+		if end > overFlowSize {
+			end = overFlowSize
+		}
+		_, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID,
+			pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, &pb.CommandRedeliverUnacknowledgedMessages{
+				ConsumerId: proto.Uint64(pc.consumerID),
+				MessageIds: pc.overflow[i:end],
+			})
+		if err != nil {
+			pc.log.WithError(err).Error("Failed to unsubscribe consumer")
+			redeliver.err = err
+		}
+	}
+
+	// clear Overflow slice
+	pc.overflow = nil
+
+	if pc.unAckTracker != nil {
+		pc.unAckTracker.clear()
+	}
+	redeliver.waitGroup.Done()
+}
+
+func (pc *partitionConsumer) runEventsLoop() {
+	for {
+		select {
+		case i := <-pc.eventsChan:
+			switch v := i.(type) {
+			case *handlerClose:
+				pc.internalClose(v)
+				return
+			case *handleSeek:
+				pc.internalSeek(v)
+			case *handleUnsubscribe:
+				pc.internalUnsubscribe(v)
+			case *handleAckCumulative:
+				pc.internalAckCumulative(v)
+			case *handleAck:
+				pc.internalAck(v)
+			case *handleRedeliver:
+				pc.internalRedeliver(v)
+			case *handleConnectionClosed:
+				pc.reconnectToBroker()
+			}
+		}
+	}
+}
+
+func (pc *partitionConsumer) internalClose(req *handlerClose) {
+	if pc.state != consumerReady {
+		req.waitGroup.Done()
+		return
+	}
+
+	pc.state = consumerClosing
+	pc.log.Info("Closing consumer")
+
+	requestID := pc.client.rpcClient.NewRequestID()
+	_, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID, pb.BaseCommand_CLOSE_CONSUMER, &pb.CommandCloseConsumer{
+		ConsumerId: proto.Uint64(pc.consumerID),
+		RequestId:  proto.Uint64(requestID),
+	})
+	pc.cnx.DeleteConsumeHandler(pc.consumerID)
+
+	if err != nil {
+		req.err = err
+	} else {
+		pc.log.Info("Closed consumer")
+		pc.state = consumerClosed
+		close(pc.options.MessageChannel)
+		//pc.cnx.UnregisterListener(pc.consumerID)
+	}
+
+	req.waitGroup.Done()
+}
+
+// Flow command gives additional permits to send messages to the consumer.
+// A typical consumer implementation will use a queue to accuMulate these messages
+// before the application is ready to consume them. After the consumer is ready,
+// the client needs to give permission to the broker to push messages.
+func (pc *partitionConsumer) internalFlow(permits uint32) error {
+	if permits <= 0 {
+		return fmt.Errorf("invalid number of permits requested: %d", permits)
+	}
+
+	requestID := pc.client.rpcClient.NewRequestID()
+	_, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID,
+		pb.BaseCommand_FLOW, &pb.CommandFlow{
+			ConsumerId:     proto.Uint64(pc.consumerID),
+			MessagePermits: proto.Uint32(permits),
+		})
+
+	if err != nil {
+		pc.log.WithError(err).Error("Failed to unsubscribe consumer")
+		return err
+	}
+	return nil
+}
+
+func (pc *partitionConsumer) HandlerMessage(response *pb.CommandMessage, headersAndPayload []byte) error {
+	msgID := response.GetMessageId()
+
+	id := newMessageID(int64(msgID.GetLedgerId()), int64(msgID.GetEntryId()),
+		int(msgID.GetBatchIndex()), pc.partitionIdx)
+
+	msgMeta, payload, err := internal.ParseMessage(headersAndPayload)
+	if err != nil {
+		return fmt.Errorf("parse message error:%s", err)
+	}
+
+	//numMsgs := msgMeta.GetNumMessagesInBatch()
+
+	msg := &message{
+		publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+		eventTime:   timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
+		key:         msgMeta.GetPartitionKey(),
+		properties:  internal.ConvertToStringMap(msgMeta.GetProperties()),
+		topic:       pc.topic,
+		msgID:       id,
+		payLoad:     payload,
+	}
+
+	consumerMsg := ConsumerMessage{
+		Message:  msg,
+		Consumer: pc,
+	}
+
+	select {
+	case pc.subQueue <- consumerMsg:
+		// Add messageId to Overflow buffer, avoiding duplicates.
+		newMid := response.GetMessageId()
+		var dup bool
+
+		pc.omu.Lock()
+		for _, mid := range pc.overflow {
+			if proto.Equal(mid, newMid) {
+				dup = true
+				break
+			}
+		}
+
+		if !dup {
+			pc.overflow = append(pc.overflow, newMid)
+		}
+		pc.omu.Unlock()
+		return nil
+	default:
+		return fmt.Errorf("consumer message channel on topic %s is full (capacity = %d)", pc.Topic(), cap(pc.options.MessageChannel))
+	}
+}
+
+type handleAck struct {
+	msgID     MessageID
+	waitGroup *sync.WaitGroup
+	err       error
+}
+
+type handleAckCumulative struct {
+	msgID MessageID
+	err   error
+}
+
+type handleUnsubscribe struct {
+	waitGroup *sync.WaitGroup
+	err       error
+}
+
+type handleSeek struct {
+	msgID     MessageID
+	waitGroup *sync.WaitGroup
+	err       error
+}
+
+type handleRedeliver struct {
+	waitGroup *sync.WaitGroup
+	err       error
+}
+
+type handlerClose struct {
+	waitGroup *sync.WaitGroup
+	err       error
+}
+
+type handleConnectionClosed struct{}
+
+func (pc *partitionConsumer) ConnectionClosed() {
+	// Trigger reconnection in the produce goroutine
+	pc.eventsChan <- &handleConnectionClosed{}
+}
+
+func (pc *partitionConsumer) reconnectToBroker() {
+	pc.log.Info("Reconnecting to broker")
+	backoff := internal.Backoff{}
+	for {
+		if pc.state != consumerReady {
+			// Consumer is already closing
+			return
+		}
+
+		err := pc.grabCnx()
+		if err == nil {
+			// Successfully reconnected
+			return
+		}
+
+		d := backoff.Next()
+		pc.log.Info("Retrying reconnection after ", d)
+
+		time.Sleep(d)
+	}
+}
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index ce8a2a9..98e2156 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -157,7 +157,7 @@
 	p.log.WithField("cnx", res.Cnx).Debug("Connected producer")
 
 	if p.pendingQueue.Size() > 0 {
-		p.log.Infof("Resending %v pending batches", p.pendingQueue.Size())
+		p.log.Infof("Resending %d pending batches", p.pendingQueue.Size())
 		for it := p.pendingQueue.Iterator(); it.HasNext(); {
 			p.cnx.WriteData(it.Next().(*pendingItem).batchData)
 		}
@@ -165,8 +165,7 @@
 	return nil
 }
 
-type connectionClosed struct {
-}
+type connectionClosed struct{}
 
 func (p *partitionProducer) ConnectionClosed() {
 	// Trigger reconnection in the produce goroutine
@@ -230,7 +229,7 @@
 
 	msg := request.msg
 
-	sendAsBatch := !p.options.DisableBatching && request.msg.ReplicationClusters == nil
+	sendAsBatch := !p.options.DisableBatching && msg.ReplicationClusters == nil
 	smm := &pb.SingleMessageMetadata{
 		PayloadSize: proto.Int(len(msg.Payload)),
 	}
diff --git a/pulsar/impl_producer.go b/pulsar/impl_producer.go
index 497a18c..75ed9c5 100644
--- a/pulsar/impl_producer.go
+++ b/pulsar/impl_producer.go
@@ -69,23 +69,29 @@
 
 	type ProducerError struct {
 		partition int
-		Producer
-		error
+		prod      Producer
+		err       error
 	}
 
 	c := make(chan ProducerError, numPartitions)
 
 	for partitionIdx, partition := range partitions {
-		go func(index int) {
-			prod, err := newPartitionProducer(client, partition, options, index)
-			c <- ProducerError{partitionIdx, prod, err}
-		}(partitionIdx)
+		go func(partitionIdx int, partition string) {
+			prod, err := newPartitionProducer(client, partition, options, partitionIdx)
+			c <- ProducerError{
+				partition: partitionIdx,
+				prod:      prod,
+				err:       err,
+			}
+		}(partitionIdx, partition)
 	}
 
 	for i := 0; i < numPartitions; i++ {
-		pe := <-c
-		err = pe.error
-		p.producers[pe.partition] = pe.Producer
+		pe, ok := <-c
+		if ok {
+			err = pe.err
+			p.producers[pe.partition] = pe.prod
+		}
 	}
 
 	if err != nil {
@@ -97,6 +103,7 @@
 		}
 		return nil, err
 	}
+
 	return p, nil
 }
 
@@ -134,21 +141,15 @@
 }
 
 func (p *producer) Flush() error {
-	var err error
 	for _, pp := range p.producers {
-		if e := pp.Flush(); e != nil && err == nil {
-			err = e
-		}
+		return pp.Flush()
 	}
-	return err
+	return nil
 }
 
 func (p *producer) Close() error {
-	var err error
 	for _, pp := range p.producers {
-		if e := pp.Close(); e != nil && err == nil {
-			err = e
-		}
+		return pp.Close()
 	}
-	return err
+	return nil
 }
diff --git a/pulsar/internal/checksum.go b/pulsar/internal/checksum.go
index 11d8d4f..3fc37ae 100644
--- a/pulsar/internal/checksum.go
+++ b/pulsar/internal/checksum.go
@@ -17,13 +17,34 @@
 
 package internal
 
-import "hash/crc32"
+import (
+    `hash`
+    `hash/crc32`
+)
 
 // crc32cTable holds the precomputed crc32 hash table
 // used by Pulsar (crc32c)
 var crc32cTable = crc32.MakeTable(crc32.Castagnoli)
 
+type CheckSum struct {
+    hash hash.Hash
+}
+
 // Crc32cCheckSum handles computing the checksum.
 func Crc32cCheckSum(data []byte) uint32 {
 	return crc32.Checksum(data, crc32cTable)
 }
+
+func (cs *CheckSum) Write(p []byte) (int, error) {
+    if cs.hash == nil {
+        cs.hash = crc32.New(crc32cTable)
+    }
+    return cs.hash.Write(p)
+}
+
+func (cs *CheckSum) compute() []byte {
+    if cs.hash == nil {
+        return nil
+    }
+    return cs.hash.Sum(nil)
+}
diff --git a/pulsar/internal/checksum_test.go b/pulsar/internal/checksum_test.go
new file mode 100644
index 0000000..23dc621
--- /dev/null
+++ b/pulsar/internal/checksum_test.go
@@ -0,0 +1,51 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package internal
+
+import (
+    "bytes"
+    "hash/crc32"
+    "testing"
+)
+
+func TestFrameChecksum(t *testing.T) {
+    input := []byte{1, 2, 3, 4, 5}
+    var f CheckSum
+
+    if got := f.compute(); got != nil {
+        t.Fatalf("compute() = %v; expected nil", got)
+    }
+
+    if _, err := f.Write(input); err != nil {
+        t.Fatalf("Write() err = %v; expected nil", err)
+    }
+
+    h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
+    if _, err := h.Write(input); err != nil {
+        t.Fatal(err)
+    }
+
+    if got, expected := f.compute(), h.Sum(nil); !bytes.Equal(got, expected) {
+        t.Fatalf("compute() = %x; expected %x", got, expected)
+    } else {
+        t.Logf("compute() = 0x%x", got)
+    }
+}
+
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 05ba409..5068ebb 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -18,16 +18,21 @@
 package internal
 
 import (
-	"github.com/apache/pulsar-client-go/pkg/pb"
-	"github.com/golang/protobuf/proto"
+    `bytes`
+    `encoding/binary`
+    `fmt`
+    "github.com/golang/protobuf/proto"
+    `io`
 
-	log "github.com/sirupsen/logrus"
+    "github.com/apache/pulsar-client-go/pkg/pb"
+    log "github.com/sirupsen/logrus"
 )
 
-// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
-const MaxFrameSize = 5 * 1024 * 1024
-
-const magicCrc32c uint16 = 0x0e01
+const (
+	// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
+	MaxFrameSize = 5 * 1024 * 1024
+	magicCrc32c uint16 = 0x0e01
+)
 
 func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand {
 	cmd := &pb.BaseCommand{
@@ -42,6 +47,10 @@
 		cmd.PartitionMetadata = msg.(*pb.CommandPartitionedTopicMetadata)
 	case pb.BaseCommand_PRODUCER:
 		cmd.Producer = msg.(*pb.CommandProducer)
+	case pb.BaseCommand_SUBSCRIBE:
+		cmd.Subscribe = msg.(*pb.CommandSubscribe)
+	case pb.BaseCommand_FLOW:
+		cmd.Flow = msg.(*pb.CommandFlow)
 	case pb.BaseCommand_PING:
 		cmd.Ping = msg.(*pb.CommandPing)
 	case pb.BaseCommand_PONG:
@@ -50,6 +59,16 @@
 		cmd.Send = msg.(*pb.CommandSend)
 	case pb.BaseCommand_CLOSE_PRODUCER:
 		cmd.CloseProducer = msg.(*pb.CommandCloseProducer)
+	case pb.BaseCommand_CLOSE_CONSUMER:
+		cmd.CloseConsumer = msg.(*pb.CommandCloseConsumer)
+	case pb.BaseCommand_ACK:
+		cmd.Ack = msg.(*pb.CommandAck)
+	case pb.BaseCommand_SEEK:
+		cmd.Seek = msg.(*pb.CommandSeek)
+	case pb.BaseCommand_UNSUBSCRIBE:
+		cmd.Unsubscribe = msg.(*pb.CommandUnsubscribe)
+	case pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES:
+		cmd.RedeliverUnacknowledgedMessages = msg.(*pb.CommandRedeliverUnacknowledgedMessages)
 	default:
 		log.Panic("Missing command type: ", cmdType)
 	}
@@ -67,6 +86,110 @@
 	wb.Write(payload)
 }
 
+func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payload []byte, err error) {
+	// reusable buffer for 4-byte uint32s
+	buf32 := make([]byte, 4)
+	r := bytes.NewReader(headersAndPayload)
+	// Wrap our reader so that we can only read
+	// bytes from our frame
+	lr := &io.LimitedReader{
+		N: int64(len(headersAndPayload)),
+		R: r,
+	}
+	// There are 3 possibilities for the following fields:
+	//  - EOF: If so, this is a "simple" command. No more parsing required.
+	//  - 2-byte magic number: Indicates the following 4 bytes are a checksum
+	//  - 4-byte metadata size
+
+	// The message may optionally stop here. If so,
+	// this is a "simple" command.
+	if lr.N <= 0 {
+		return nil, nil, nil
+	}
+
+	// Optionally, the next 2 bytes may be the magicNumber. If
+	// so, it indicates that the following 4 bytes are a checksum.
+	// If not, the following 2 bytes (plus the 2 bytes already read),
+	// are the metadataSize, which is why a 4 byte buffer is used.
+	if _, err = io.ReadFull(lr, buf32); err != nil {
+		return nil, nil, err
+	}
+
+	// Check for magicNumber which indicates a checksum
+	var chksum CheckSum
+	var expectedChksum []byte
+
+	magicNumber := make([]byte, 2)
+	binary.BigEndian.PutUint16(magicNumber, magicCrc32c)
+	if magicNumber[0] == buf32[0] && magicNumber[1] == buf32[1] {
+		expectedChksum = make([]byte, 4)
+
+		// We already read the 2-byte magicNumber and the
+		// initial 2 bytes of the checksum
+		expectedChksum[0] = buf32[2]
+		expectedChksum[1] = buf32[3]
+
+		// Read the remaining 2 bytes of the checksum
+		if _, err = io.ReadFull(lr, expectedChksum[2:]); err != nil {
+			return nil, nil, err
+		}
+
+		// Use a tee reader to compute the checksum
+		// of everything consumed after this point
+		lr.R = io.TeeReader(lr.R, &chksum)
+
+		// Fill buffer with metadata size, which is what it
+		// would already contain if there were no magic number / checksum
+		if _, err = io.ReadFull(lr, buf32); err != nil {
+			return nil, nil, err
+		}
+	}
+
+	// Read metadataSize
+	metadataSize := binary.BigEndian.Uint32(buf32)
+	// guard against allocating large buffer
+	if metadataSize > MaxFrameSize {
+		return nil, nil, fmt.Errorf("frame metadata size (%d) "+
+				"cannot b greater than max frame size (%d)", metadataSize, MaxFrameSize)
+	}
+
+	// Read protobuf encoded metadata
+	metaBuf := make([]byte, metadataSize)
+	if _, err = io.ReadFull(lr, metaBuf); err != nil {
+		return nil, nil, err
+	}
+	msgMeta = new(pb.MessageMetadata)
+	if err = proto.Unmarshal(metaBuf, msgMeta); err != nil {
+		return nil, nil, err
+	}
+
+    batchLen := make([]byte, 2)
+    if _, err = io.ReadFull(lr, batchLen); err != nil {
+        return nil, nil, err
+    }
+
+    // Anything left in the frame is considered
+    // the payload and can be any sequence of bytes.
+	if lr.N > 0 {
+		// guard against allocating large buffer
+		if lr.N > MaxFrameSize {
+			return nil, nil, fmt.Errorf("frame payload size (%d) "+
+					"cannot be greater than max frame size (%d)", lr.N, MaxFrameSize)
+		}
+		payload = make([]byte, lr.N)
+		if _, err = io.ReadFull(lr, payload); err != nil {
+			return nil, nil, err
+		}
+	}
+
+	if computed := chksum.compute(); !bytes.Equal(computed, expectedChksum) {
+		return nil, nil, fmt.Errorf("checksum mismatch: computed (0x%X) does "+
+				"not match given checksum (0x%X)", computed, expectedChksum)
+	}
+
+	return msgMeta, payload, nil
+}
+
 func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageMetadata, payload []byte) {
 	// Wire format
 	// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 7d5ee74..11337ec 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -58,9 +58,15 @@
 	WriteData(data []byte)
 	RegisterListener(id uint64, listener ConnectionListener)
 	UnregisterListener(id uint64)
+	AddConsumeHandler(id uint64, handler ConsumerHandler)
+	DeleteConsumeHandler(id uint64)
 	Close()
 }
 
+type ConsumerHandler interface {
+    HandlerMessage(response *pb.CommandMessage, headersAndPayload []byte) error
+}
+
 type connectionState int
 
 const (
@@ -101,6 +107,7 @@
 	writeRequests    chan []byte
 	pendingReqs      map[uint64]*request
 	listeners        map[uint64]ConnectionListener
+	connWrapper      *ConnWrapper
 
 	tlsOptions *TLSOptions
 	auth       auth.Provider
@@ -122,6 +129,7 @@
 		incomingRequests: make(chan *request),
 		writeRequests:    make(chan []byte),
 		listeners:        make(map[uint64]ConnectionListener),
+        connWrapper:      NewConnWrapper(),
 	}
 	cnx.reader = newConnectionReader(cnx)
 	cnx.cond = sync.NewCond(cnx)
@@ -295,6 +303,7 @@
 func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []byte) {
 	c.log.Debugf("Received command: %s -- payload: %v", cmd, headersAndPayload)
 	c.lastDataReceivedTime = time.Now()
+    var err error
 
 	switch *cmd.Type {
 	case pb.BaseCommand_SUCCESS:
@@ -331,6 +340,7 @@
 	case pb.BaseCommand_SEND_ERROR:
 
 	case pb.BaseCommand_MESSAGE:
+        err = c.handleMessage(cmd.GetMessage(), headersAndPayload)
 	case pb.BaseCommand_PING:
 		c.handlePing()
 	case pb.BaseCommand_PONG:
@@ -339,7 +349,9 @@
 	case pb.BaseCommand_ACTIVE_CONSUMER_CHANGE:
 
 	default:
-		c.log.Errorf("Received invalid command type: %s", cmd.Type)
+        if err != nil {
+            c.log.Errorf("Received invalid command type: %s", cmd.Type)
+        }
 		c.Close()
 	}
 }
@@ -382,6 +394,21 @@
 	}
 }
 
+func (c *connection) handleMessage(response *pb.CommandMessage, payload []byte) error {
+    c.log.Debug("Got Message: ", response)
+    consumerId := response.GetConsumerId()
+    if consumer, ok := c.connWrapper.Consumers[consumerId]; ok {
+        err := consumer.HandlerMessage(response, payload)
+        if err != nil {
+            c.log.WithField("consumerId", consumerId).Error("handle message err: ", response.MessageId)
+            return errors.New("handler not found")
+        }
+    } else {
+        c.log.WithField("consumerId", consumerId).Warn("Got unexpected message: ", response.MessageId)
+    }
+    return nil
+}
+
 func (c *connection) sendPing() {
 	if c.lastDataReceivedTime.Add(2 * keepAliveInterval).Before(time.Now()) {
 		// We have not received a response to the previous Ping request, the
@@ -485,3 +512,26 @@
 
 	return tlsConfig, nil
 }
+
+type ConnWrapper struct {
+	Rwmu             sync.RWMutex
+	Consumers        map[uint64]ConsumerHandler
+}
+
+func NewConnWrapper() *ConnWrapper {
+	return &ConnWrapper{
+		Consumers: make(map[uint64]ConsumerHandler),
+	}
+}
+
+func (c *connection) AddConsumeHandler(id uint64, handler ConsumerHandler) {
+    c.connWrapper.Rwmu.Lock()
+    c.connWrapper.Consumers[id] = handler
+    c.connWrapper.Rwmu.Unlock()
+}
+
+func (c *connection) DeleteConsumeHandler(id uint64) {
+    c.connWrapper.Rwmu.Lock()
+    delete(c.connWrapper.Consumers, id)
+    c.connWrapper.Rwmu.Unlock()
+}
diff --git a/pulsar/internal/lookup_service_test.go b/pulsar/internal/lookup_service_test.go
index 8baddf3..de7f058 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -95,6 +95,12 @@
 	return nil, nil
 }
 
+func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, requestId uint64, cmdType pb.BaseCommand_Type,
+		message proto.Message) (*RPCResult, error) {
+	assert.Fail(c.t, "Shouldn't be called")
+	return nil, nil
+}
+
 func responseType(r pb.CommandLookupTopicResponse_LookupType) *pb.CommandLookupTopicResponse_LookupType {
 	return &r
 }
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 8234dc8..2b68a71 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -45,6 +45,8 @@
 	Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
 		cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
 
+	RequestOnCnxNoWait(cnx Connection, requestId uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
+
 	RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
 }
 
@@ -110,6 +112,19 @@
 	return rpcResult, nil
 }
 
+func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, requestId uint64, cmdType pb.BaseCommand_Type,
+		message proto.Message) (*RPCResult, error) {
+	rpcResult := &RPCResult{
+		Cnx: cnx,
+	}
+
+	cnx.SendRequest(requestId, baseCommand(cmdType, message), func(response *pb.BaseCommand) {
+		rpcResult.Response = response
+	})
+
+	return rpcResult, nil
+}
+
 func (c *rpcClient) NewRequestID() uint64 {
 	return atomic.AddUint64(&c.requestIDGenerator, 1)
 }
diff --git a/pulsar/message.go b/pulsar/message.go
index 1862c18..a70827d 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -62,7 +62,7 @@
 	// EventTime get the event time associated with this message. It is typically set by the applications via
 	// `ProducerMessage.EventTime`.
 	// If there isn't any event time associated with this event, it will be nil.
-	EventTime() *time.Time
+	EventTime() time.Time
 
 	// Key get the key of the message, if any
 	Key() string
diff --git a/pulsar/unackMsgTracker_test.go b/pulsar/unackMsgTracker_test.go
new file mode 100644
index 0000000..d292fc8
--- /dev/null
+++ b/pulsar/unackMsgTracker_test.go
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+    `github.com/apache/pulsar-client-go/pkg/pb`
+    `github.com/golang/protobuf/proto`
+    `github.com/stretchr/testify/assert`
+    `testing`
+)
+
+func TestUnackedMessageTracker(t *testing.T) {
+    unAckTracker := NewUnackedMessageTracker()
+
+    var msgIDs []*pb.MessageIdData
+
+    for i := 0; i < 5; i++ {
+        msgID := &pb.MessageIdData{
+            LedgerId:   proto.Uint64(1),
+            EntryId:    proto.Uint64(uint64(i)),
+            Partition:  proto.Int32(-1),
+            BatchIndex: proto.Int32(-1),
+        }
+
+        msgIDs = append(msgIDs, msgID)
+    }
+
+    for _, msgID := range msgIDs {
+        ok := unAckTracker.Add(msgID)
+        assert.True(t, ok)
+    }
+
+    flag := unAckTracker.IsEmpty()
+    assert.False(t, flag)
+
+    num := unAckTracker.Size()
+    assert.Equal(t, num, 5)
+
+    for index, msgID := range msgIDs {
+        unAckTracker.Remove(msgID)
+        assert.Equal(t, 4-index, unAckTracker.Size())
+    }
+
+    num = unAckTracker.Size()
+    assert.Equal(t, num, 0)
+
+    flag = unAckTracker.IsEmpty()
+    assert.True(t, flag)
+}
diff --git a/pulsar/unackedMsgTracker.go b/pulsar/unackedMsgTracker.go
new file mode 100644
index 0000000..8ec51c6
--- /dev/null
+++ b/pulsar/unackedMsgTracker.go
@@ -0,0 +1,213 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"sync"
+	"time"
+
+	"github.com/apache/pulsar-client-go/pkg/pb"
+	"github.com/golang/protobuf/proto"
+
+	set "github.com/deckarep/golang-set"
+	log "github.com/sirupsen/logrus"
+)
+
+type UnackedMessageTracker struct {
+	cmu        sync.RWMutex // protects following
+	currentSet set.Set
+	oldOpenSet set.Set
+	timeout    *time.Ticker
+
+	pc  *partitionConsumer
+	pcs []*partitionConsumer
+}
+
+// NewUnackedMessageTracker init UnackedMessageTracker object
+func NewUnackedMessageTracker() *UnackedMessageTracker {
+	unAckTracker := &UnackedMessageTracker{
+		currentSet: set.NewSet(),
+		oldOpenSet: set.NewSet(),
+	}
+
+	return unAckTracker
+}
+
+// Size return the size of current set and old open set cardinality
+func (t *UnackedMessageTracker) Size() int {
+	t.cmu.Lock()
+	defer t.cmu.Unlock()
+
+	return t.currentSet.Cardinality() + t.oldOpenSet.Cardinality()
+}
+
+// IsEmpty check if the currentSet or oldOpenSet are empty.
+func (t *UnackedMessageTracker) IsEmpty() bool {
+	t.cmu.RLock()
+	defer t.cmu.RUnlock()
+
+	return t.currentSet.Cardinality() == 0 && t.oldOpenSet.Cardinality() == 0
+}
+
+// Add will add message id data to currentSet and remove the message id from oldOpenSet.
+func (t *UnackedMessageTracker) Add(id *pb.MessageIdData) bool {
+	t.cmu.Lock()
+	defer t.cmu.Unlock()
+
+	t.oldOpenSet.Remove(id)
+	return t.currentSet.Add(id)
+}
+
+// Remove will remove message id data from currentSet and oldOpenSet
+func (t *UnackedMessageTracker) Remove(id *pb.MessageIdData) {
+	t.cmu.Lock()
+	defer t.cmu.Unlock()
+
+	t.currentSet.Remove(id)
+	t.oldOpenSet.Remove(id)
+}
+
+func (t *UnackedMessageTracker) clear() {
+	t.cmu.Lock()
+	defer t.cmu.Unlock()
+
+	t.currentSet.Clear()
+	t.oldOpenSet.Clear()
+}
+
+func (t *UnackedMessageTracker) toggle() {
+	t.cmu.Lock()
+	defer t.cmu.Unlock()
+
+	t.currentSet, t.oldOpenSet = t.oldOpenSet, t.currentSet
+}
+
+func (t *UnackedMessageTracker) isAckTimeout() bool {
+	t.cmu.RLock()
+	defer t.cmu.RUnlock()
+
+	return !(t.oldOpenSet.Cardinality() == 0)
+}
+
+func (t *UnackedMessageTracker) lessThanOrEqual(id1, id2 pb.MessageIdData) bool {
+	return id1.GetPartition() == id2.GetPartition() &&
+		(id1.GetLedgerId() < id2.GetLedgerId() || id1.GetEntryId() <= id2.GetEntryId())
+}
+
+func (t *UnackedMessageTracker) RemoveMessagesTill(id pb.MessageIdData) int {
+	t.cmu.Lock()
+	defer t.cmu.Unlock()
+
+	counter := 0
+
+	t.currentSet.Each(func(elem interface{}) bool {
+		if t.lessThanOrEqual(elem.(pb.MessageIdData), id) {
+			t.currentSet.Remove(elem)
+			counter++
+		}
+		return true
+	})
+
+	t.oldOpenSet.Each(func(elem interface{}) bool {
+		if t.lessThanOrEqual(elem.(pb.MessageIdData), id) {
+			t.currentSet.Remove(elem)
+			counter++
+		}
+		return true
+	})
+
+	return counter
+}
+
+func (t *UnackedMessageTracker) Start(ackTimeoutMillis int64) {
+	t.cmu.Lock()
+	defer t.cmu.Unlock()
+	t.timeout = time.NewTicker((time.Duration(ackTimeoutMillis)) * time.Millisecond)
+
+	go t.handlerCmd(ackTimeoutMillis)
+}
+
+func (t *UnackedMessageTracker) handlerCmd(ackTimeoutMillis int64) {
+	for {
+		select {
+		case tick := <-t.timeout.C:
+			if t.isAckTimeout() {
+				log.Debugf(" %d messages have timed-out", t.oldOpenSet.Cardinality())
+				messageIds := make([]*pb.MessageIdData, 0)
+
+				t.oldOpenSet.Each(func(i interface{}) bool {
+					messageIds = append(messageIds, i.(*pb.MessageIdData))
+					return false
+				})
+
+				log.Debugf("messageID length is:%d", len(messageIds))
+
+				t.oldOpenSet.Clear()
+
+				if t.pc != nil {
+					requestID := t.pc.client.rpcClient.NewRequestID()
+					cmd := &pb.CommandRedeliverUnacknowledgedMessages{
+						ConsumerId: proto.Uint64(t.pc.consumerID),
+						MessageIds: messageIds,
+					}
+
+					_, err := t.pc.client.rpcClient.RequestOnCnx(t.pc.cnx, requestID,
+						pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, cmd)
+					if err != nil {
+						t.pc.log.WithError(err).Error("Failed to unsubscribe consumer")
+						return
+					}
+
+					log.Debugf("consumer:%v redeliver messages num:%d", t.pc.consumerName, len(messageIds))
+				} else if t.pcs != nil {
+					messageIdsMap := make(map[int32][]*pb.MessageIdData)
+					for _, msgID := range messageIds {
+						messageIdsMap[msgID.GetPartition()] = append(messageIdsMap[msgID.GetPartition()], msgID)
+					}
+
+					for index, subConsumer := range t.pcs {
+						if messageIdsMap[int32(index)] != nil {
+							requestID := subConsumer.client.rpcClient.NewRequestID()
+							cmd := &pb.CommandRedeliverUnacknowledgedMessages{
+								ConsumerId: proto.Uint64(subConsumer.consumerID),
+								MessageIds: messageIdsMap[int32(index)],
+							}
+
+							_, err := subConsumer.client.rpcClient.RequestOnCnx(subConsumer.cnx, requestID,
+								pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, cmd)
+							if err != nil {
+								subConsumer.log.WithError(err).Error("Failed to unsubscribe consumer")
+								return
+							}
+						}
+					}
+				}
+			}
+			log.Debug("Tick at ", tick)
+		}
+
+		t.toggle()
+	}
+}
+
+func (t *UnackedMessageTracker) Stop() {
+	t.timeout.Stop()
+	log.Debug("stop ticker ", t.timeout)
+
+	t.clear()
+}
diff --git a/util/error.go b/util/error.go
new file mode 100644
index 0000000..6a051f5
--- /dev/null
+++ b/util/error.go
@@ -0,0 +1,50 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package util
+
+import (
+    `fmt`
+    `github.com/apache/pulsar-client-go/pkg/pb`
+)
+
+// NewUnexpectedErrMsg instantiates an ErrUnexpectedMsg error.
+// Optionally provide a list of IDs associated with the message
+// for additional context in the error message.
+func NewUnexpectedErrMsg(msgType pb.BaseCommand_Type, ids ...interface{}) *UnexpectedErrMsg {
+    return &UnexpectedErrMsg{
+        msgType: msgType,
+        ids:     ids,
+    }
+}
+
+// UnexpectedErrMsg is returned when an unexpected message is received.
+type UnexpectedErrMsg struct {
+    msgType pb.BaseCommand_Type
+    ids     []interface{}
+}
+
+// Error satisfies the error interface.
+func (e *UnexpectedErrMsg) Error() string {
+    msg := fmt.Sprintf("received unexpected message of type %q", e.msgType.String())
+    for _, id := range e.ids {
+        msg += fmt.Sprintf(" id=%v", id)
+    }
+    return msg
+}
diff --git a/util/util.go b/util/util.go
new file mode 100644
index 0000000..06a7e53
--- /dev/null
+++ b/util/util.go
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package util
+
+import (
+    `reflect`
+)
+
+// IsNil check if the interface is nil
+func IsNil(i interface{}) bool {
+    vi := reflect.ValueOf(i)
+    if vi.Kind() == reflect.Ptr {
+        return vi.IsNil()
+    }
+    return false
+}
+
diff --git a/util/util_test.go b/util/util_test.go
new file mode 100644
index 0000000..2e1195c
--- /dev/null
+++ b/util/util_test.go
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package util
+
+import (
+    `github.com/stretchr/testify/assert`
+    `testing`
+)
+
+func TestIsNil(t *testing.T) {
+    var a interface{} = nil
+    var b interface{} = (*int)(nil)
+
+    assert.True(t, a == nil)
+    assert.False(t, b == nil)
+}