Merge master code and fix conflict
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
diff --git a/examples/consumer-listener/consumer-listener.go b/examples/consumer-listener/consumer-listener.go
new file mode 100644
index 0000000..c20d731
--- /dev/null
+++ b/examples/consumer-listener/consumer-listener.go
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+ "fmt"
+ "log"
+
+ "github.com/apache/pulsar-client-go/pulsar"
+)
+
+func main() {
+ client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ defer client.Close()
+
+ channel := make(chan pulsar.ConsumerMessage, 100)
+
+ options := pulsar.ConsumerOptions{
+ Topic: "topic-1",
+ SubscriptionName: "my-subscription",
+ Type: pulsar.Shared,
+ }
+
+ options.MessageChannel = channel
+
+ consumer, err := client.Subscribe(options)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ defer consumer.Close()
+
+ // Receive messages from channel. The channel returns a struct which contains message and the consumer from where
+ // the message was received. It's not necessary here since we have 1 single consumer, but the channel could be
+ // shared across multiple consumers as well
+ for cm := range channel {
+ msg := cm.Message
+ fmt.Printf("Received message msgId: %v -- content: '%s'\n",
+ msg.ID(), string(msg.Payload()))
+
+ if err := consumer.Ack(msg); err != nil {
+ log.Fatal(err)
+ }
+ }
+}
diff --git a/examples/consumer/consumer.go b/examples/consumer/consumer.go
new file mode 100644
index 0000000..3010853
--- /dev/null
+++ b/examples/consumer/consumer.go
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+ "context"
+ "fmt"
+ `github.com/apache/pulsar-client-go/pulsar`
+ `log`
+)
+
+func main() {
+ client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ defer client.Close()
+
+ consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: "topic-1",
+ SubscriptionName: "my-sub",
+ Type: pulsar.Shared,
+ })
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer consumer.Close()
+
+ for i := 0; i < 10; i++ {
+ msg, err := consumer.Receive(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
+ msg.ID(), string(msg.Payload()))
+
+ if err := consumer.Ack(msg); err != nil {
+ log.Fatal(err)
+ }
+ }
+
+ if err := consumer.Unsubscribe(); err != nil {
+ log.Fatal(err)
+ }
+}
diff --git a/examples/producer/producer.go b/examples/producer/producer.go
new file mode 100644
index 0000000..56b87b5
--- /dev/null
+++ b/examples/producer/producer.go
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+ `context`
+ `fmt`
+ `github.com/apache/pulsar-client-go/pulsar`
+ `log`
+)
+
+func main() {
+ client, err := pulsar.NewClient(pulsar.ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ defer client.Close()
+
+ producer, err := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: "topic-1",
+ })
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ defer producer.Close()
+
+ ctx := context.Background()
+
+ for i := 0; i < 10; i++ {
+ if err := producer.Send(ctx, &pulsar.ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ }); err != nil {
+ log.Fatal(err)
+ }
+ }
+}
diff --git a/go.mod b/go.mod
index 2433c96..dddb428 100644
--- a/go.mod
+++ b/go.mod
@@ -3,10 +3,16 @@
go 1.12
require (
+ github.com/DataDog/zstd v1.4.0 // indirect
github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
+ github.com/cespare/xxhash v1.1.0 // indirect
+ github.com/deckarep/golang-set v1.7.1
github.com/golang/protobuf v1.3.1
+ github.com/google/go-cmp v0.3.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
+ github.com/klauspost/compress v1.6.2
+ github.com/klauspost/cpuid v1.2.1 // indirect
github.com/pierrec/lz4 v2.0.5+incompatible
github.com/pkg/errors v0.8.1
github.com/sirupsen/logrus v1.4.1
diff --git a/go.sum b/go.sum
index 0fc836c..a1ccf2f 100644
--- a/go.sum
+++ b/go.sum
@@ -1,14 +1,28 @@
+github.com/DataDog/zstd v1.4.0 h1:vhoV+DUHnRZdKW1i5UMjAk2G4JY8wN4ayRfYDNdEhwo=
+github.com/DataDog/zstd v1.4.0/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
+github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
+github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6 h1:KXlsf+qt/X5ttPGEjR0tPH1xaWWoKBEg9Q1THAj2h3I=
github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE=
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
+github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
+github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/deckarep/golang-set v1.7.1 h1:SCQV0S6gTtp6itiFrTqI+pfmJ4LN85S1YzhDf9rTHJQ=
+github.com/deckarep/golang-set v1.7.1/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
+github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
+github.com/klauspost/compress v1.6.2 h1:D9kM6nOc1x+yA/DW/k81uG1xdmwqCMQ/A266P1edQEw=
+github.com/klauspost/compress v1.6.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
+github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w=
+github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
@@ -19,6 +33,7 @@
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
+github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8=
diff --git a/pkg/compression/lz4.go b/pkg/compression/lz4.go
index 449c205..b391336 100644
--- a/pkg/compression/lz4.go
+++ b/pkg/compression/lz4.go
@@ -21,8 +21,7 @@
"github.com/pierrec/lz4"
)
-type lz4Provider struct {
-}
+type lz4Provider struct {}
// NewLz4Provider return a interface of Provider.
func NewLz4Provider() Provider {
diff --git a/pkg/pb/PulsarApi.pb.go b/pkg/pb/PulsarApi.pb.go
index 1406462..f47feed 100644
--- a/pkg/pb/PulsarApi.pb.go
+++ b/pkg/pb/PulsarApi.pb.go
@@ -21,10 +21,11 @@
type CompressionType int32
const (
- CompressionType_NONE CompressionType = 0
- CompressionType_LZ4 CompressionType = 1
- CompressionType_ZLIB CompressionType = 2
- CompressionType_ZSTD CompressionType = 3
+ CompressionType_NONE CompressionType = 0
+ CompressionType_LZ4 CompressionType = 1
+ CompressionType_ZLIB CompressionType = 2
+ CompressionType_ZSTD CompressionType = 3
+ CompressionType_SNAPPY CompressionType = 4
)
var CompressionType_name = map[int32]string{
@@ -32,12 +33,14 @@
1: "LZ4",
2: "ZLIB",
3: "ZSTD",
+ 4: "SNAPPY",
}
var CompressionType_value = map[string]int32{
- "NONE": 0,
- "LZ4": 1,
- "ZLIB": 2,
- "ZSTD": 3,
+ "NONE": 0,
+ "LZ4": 1,
+ "ZLIB": 2,
+ "ZSTD": 3,
+ "SNAPPY": 4,
}
func (x CompressionType) Enum() *CompressionType {
@@ -57,7 +60,7 @@
return nil
}
func (CompressionType) EnumDescriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{0}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{0}
}
type ServerError int32
@@ -83,6 +86,7 @@
ServerError_ProducerBusy ServerError = 16
ServerError_InvalidTopicName ServerError = 17
ServerError_IncompatibleSchema ServerError = 18
+ ServerError_ConsumerAssignError ServerError = 19
)
var ServerError_name = map[int32]string{
@@ -105,6 +109,7 @@
16: "ProducerBusy",
17: "InvalidTopicName",
18: "IncompatibleSchema",
+ 19: "ConsumerAssignError",
}
var ServerError_value = map[string]int32{
"UnknownError": 0,
@@ -126,6 +131,7 @@
"ProducerBusy": 16,
"InvalidTopicName": 17,
"IncompatibleSchema": 18,
+ "ConsumerAssignError": 19,
}
func (x ServerError) Enum() *ServerError {
@@ -145,7 +151,7 @@
return nil
}
func (ServerError) EnumDescriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{1}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{1}
}
type AuthMethod int32
@@ -184,7 +190,7 @@
return nil
}
func (AuthMethod) EnumDescriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{2}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{2}
}
// Each protocol version identify new features that are
@@ -208,6 +214,7 @@
// Added CommandActiveConsumerChange
// Added CommandGetTopicsOfNamespace
ProtocolVersion_v13 ProtocolVersion = 13
+ ProtocolVersion_v14 ProtocolVersion = 14
)
var ProtocolVersion_name = map[int32]string{
@@ -225,6 +232,7 @@
11: "v11",
12: "v12",
13: "v13",
+ 14: "v14",
}
var ProtocolVersion_value = map[string]int32{
"v0": 0,
@@ -241,6 +249,7 @@
"v11": 11,
"v12": 12,
"v13": 13,
+ "v14": 14,
}
func (x ProtocolVersion) Enum() *ProtocolVersion {
@@ -260,32 +269,65 @@
return nil
}
func (ProtocolVersion) EnumDescriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{3}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{3}
}
type Schema_Type int32
const (
- Schema_None Schema_Type = 0
- Schema_String Schema_Type = 1
- Schema_Json Schema_Type = 2
- Schema_Protobuf Schema_Type = 3
- Schema_Avro Schema_Type = 4
+ Schema_None Schema_Type = 0
+ Schema_String Schema_Type = 1
+ Schema_Json Schema_Type = 2
+ Schema_Protobuf Schema_Type = 3
+ Schema_Avro Schema_Type = 4
+ Schema_Bool Schema_Type = 5
+ Schema_Int8 Schema_Type = 6
+ Schema_Int16 Schema_Type = 7
+ Schema_Int32 Schema_Type = 8
+ Schema_Int64 Schema_Type = 9
+ Schema_Float Schema_Type = 10
+ Schema_Double Schema_Type = 11
+ Schema_Date Schema_Type = 12
+ Schema_Time Schema_Type = 13
+ Schema_Timestamp Schema_Type = 14
+ Schema_KeyValue Schema_Type = 15
)
var Schema_Type_name = map[int32]string{
- 0: "None",
- 1: "String",
- 2: "Json",
- 3: "Protobuf",
- 4: "Avro",
+ 0: "None",
+ 1: "String",
+ 2: "Json",
+ 3: "Protobuf",
+ 4: "Avro",
+ 5: "Bool",
+ 6: "Int8",
+ 7: "Int16",
+ 8: "Int32",
+ 9: "Int64",
+ 10: "Float",
+ 11: "Double",
+ 12: "Date",
+ 13: "Time",
+ 14: "Timestamp",
+ 15: "KeyValue",
}
var Schema_Type_value = map[string]int32{
- "None": 0,
- "String": 1,
- "Json": 2,
- "Protobuf": 3,
- "Avro": 4,
+ "None": 0,
+ "String": 1,
+ "Json": 2,
+ "Protobuf": 3,
+ "Avro": 4,
+ "Bool": 5,
+ "Int8": 6,
+ "Int16": 7,
+ "Int32": 8,
+ "Int64": 9,
+ "Float": 10,
+ "Double": 11,
+ "Date": 12,
+ "Time": 13,
+ "Timestamp": 14,
+ "KeyValue": 15,
}
func (x Schema_Type) Enum() *Schema_Type {
@@ -305,26 +347,29 @@
return nil
}
func (Schema_Type) EnumDescriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{0, 0}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{0, 0}
}
type CommandSubscribe_SubType int32
const (
- CommandSubscribe_Exclusive CommandSubscribe_SubType = 0
- CommandSubscribe_Shared CommandSubscribe_SubType = 1
- CommandSubscribe_Failover CommandSubscribe_SubType = 2
+ CommandSubscribe_Exclusive CommandSubscribe_SubType = 0
+ CommandSubscribe_Shared CommandSubscribe_SubType = 1
+ CommandSubscribe_Failover CommandSubscribe_SubType = 2
+ CommandSubscribe_Key_Shared CommandSubscribe_SubType = 3
)
var CommandSubscribe_SubType_name = map[int32]string{
0: "Exclusive",
1: "Shared",
2: "Failover",
+ 3: "Key_Shared",
}
var CommandSubscribe_SubType_value = map[string]int32{
- "Exclusive": 0,
- "Shared": 1,
- "Failover": 2,
+ "Exclusive": 0,
+ "Shared": 1,
+ "Failover": 2,
+ "Key_Shared": 3,
}
func (x CommandSubscribe_SubType) Enum() *CommandSubscribe_SubType {
@@ -344,7 +389,7 @@
return nil
}
func (CommandSubscribe_SubType) EnumDescriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{9, 0}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{12, 0}
}
type CommandSubscribe_InitialPosition int32
@@ -380,7 +425,7 @@
return nil
}
func (CommandSubscribe_InitialPosition) EnumDescriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{9, 1}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{12, 1}
}
type CommandPartitionedTopicMetadataResponse_LookupType int32
@@ -416,7 +461,7 @@
return nil
}
func (CommandPartitionedTopicMetadataResponse_LookupType) EnumDescriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{11, 0}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{14, 0}
}
type CommandLookupTopicResponse_LookupType int32
@@ -455,7 +500,7 @@
return nil
}
func (CommandLookupTopicResponse_LookupType) EnumDescriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{13, 0}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{16, 0}
}
type CommandAck_AckType int32
@@ -491,7 +536,7 @@
return nil
}
func (CommandAck_AckType) EnumDescriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{19, 0}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{22, 0}
}
// Acks can contain a flag to indicate the consumer
@@ -539,7 +584,7 @@
return nil
}
func (CommandAck_ValidationError) EnumDescriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{19, 1}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{22, 1}
}
type CommandGetTopicsOfNamespace_Mode int32
@@ -578,7 +623,7 @@
return nil
}
func (CommandGetTopicsOfNamespace_Mode) EnumDescriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{37, 0}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{40, 0}
}
type BaseCommand_Type int32
@@ -618,6 +663,8 @@
BaseCommand_GET_TOPICS_OF_NAMESPACE_RESPONSE BaseCommand_Type = 33
BaseCommand_GET_SCHEMA BaseCommand_Type = 34
BaseCommand_GET_SCHEMA_RESPONSE BaseCommand_Type = 35
+ BaseCommand_AUTH_CHALLENGE BaseCommand_Type = 36
+ BaseCommand_AUTH_RESPONSE BaseCommand_Type = 37
)
var BaseCommand_Type_name = map[int32]string{
@@ -655,6 +702,8 @@
33: "GET_TOPICS_OF_NAMESPACE_RESPONSE",
34: "GET_SCHEMA",
35: "GET_SCHEMA_RESPONSE",
+ 36: "AUTH_CHALLENGE",
+ 37: "AUTH_RESPONSE",
}
var BaseCommand_Type_value = map[string]int32{
"CONNECT": 2,
@@ -691,6 +740,8 @@
"GET_TOPICS_OF_NAMESPACE_RESPONSE": 33,
"GET_SCHEMA": 34,
"GET_SCHEMA_RESPONSE": 35,
+ "AUTH_CHALLENGE": 36,
+ "AUTH_RESPONSE": 37,
}
func (x BaseCommand_Type) Enum() *BaseCommand_Type {
@@ -710,7 +761,7 @@
return nil
}
func (BaseCommand_Type) EnumDescriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{41, 0}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{44, 0}
}
type Schema struct {
@@ -727,7 +778,7 @@
func (m *Schema) String() string { return proto.CompactTextString(m) }
func (*Schema) ProtoMessage() {}
func (*Schema) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{0}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{0}
}
func (m *Schema) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Schema.Unmarshal(m, b)
@@ -789,7 +840,7 @@
func (m *MessageIdData) String() string { return proto.CompactTextString(m) }
func (*MessageIdData) ProtoMessage() {}
func (*MessageIdData) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{1}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{1}
}
func (m *MessageIdData) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_MessageIdData.Unmarshal(m, b)
@@ -852,7 +903,7 @@
func (m *KeyValue) String() string { return proto.CompactTextString(m) }
func (*KeyValue) ProtoMessage() {}
func (*KeyValue) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{2}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{2}
}
func (m *KeyValue) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_KeyValue.Unmarshal(m, b)
@@ -898,7 +949,7 @@
func (m *KeyLongValue) String() string { return proto.CompactTextString(m) }
func (*KeyLongValue) ProtoMessage() {}
func (*KeyLongValue) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{3}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{3}
}
func (m *KeyLongValue) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_KeyLongValue.Unmarshal(m, b)
@@ -945,7 +996,7 @@
func (m *EncryptionKeys) String() string { return proto.CompactTextString(m) }
func (*EncryptionKeys) ProtoMessage() {}
func (*EncryptionKeys) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{4}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{4}
}
func (m *EncryptionKeys) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_EncryptionKeys.Unmarshal(m, b)
@@ -1013,19 +1064,27 @@
// Algorithm used to encrypt data key
EncryptionAlgo *string `protobuf:"bytes,14,opt,name=encryption_algo,json=encryptionAlgo" json:"encryption_algo,omitempty"`
// Additional parameters required by encryption
- EncryptionParam []byte `protobuf:"bytes,15,opt,name=encryption_param,json=encryptionParam" json:"encryption_param,omitempty"`
- SchemaVersion []byte `protobuf:"bytes,16,opt,name=schema_version,json=schemaVersion" json:"schema_version,omitempty"`
- PartitionKeyB64Encoded *bool `protobuf:"varint,17,opt,name=partition_key_b64_encoded,json=partitionKeyB64Encoded,def=0" json:"partition_key_b64_encoded,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_unrecognized []byte `json:"-"`
- XXX_sizecache int32 `json:"-"`
+ EncryptionParam []byte `protobuf:"bytes,15,opt,name=encryption_param,json=encryptionParam" json:"encryption_param,omitempty"`
+ SchemaVersion []byte `protobuf:"bytes,16,opt,name=schema_version,json=schemaVersion" json:"schema_version,omitempty"`
+ PartitionKeyB64Encoded *bool `protobuf:"varint,17,opt,name=partition_key_b64_encoded,json=partitionKeyB64Encoded,def=0" json:"partition_key_b64_encoded,omitempty"`
+ // Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode.
+ OrderingKey []byte `protobuf:"bytes,18,opt,name=ordering_key,json=orderingKey" json:"ordering_key,omitempty"`
+ // Mark the message to be delivered at or after the specified timestamp
+ DeliverAtTime *int64 `protobuf:"varint,19,opt,name=deliver_at_time,json=deliverAtTime" json:"deliver_at_time,omitempty"`
+ // Identify whether a message is a "marker" message used for
+ // internal metadata instead of application published data.
+ // Markers will generally not be propagated back to clients
+ MarkerType *int32 `protobuf:"varint,20,opt,name=marker_type,json=markerType" json:"marker_type,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
}
func (m *MessageMetadata) Reset() { *m = MessageMetadata{} }
func (m *MessageMetadata) String() string { return proto.CompactTextString(m) }
func (*MessageMetadata) ProtoMessage() {}
func (*MessageMetadata) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{5}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{5}
}
func (m *MessageMetadata) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_MessageMetadata.Unmarshal(m, b)
@@ -1163,6 +1222,27 @@
return Default_MessageMetadata_PartitionKeyB64Encoded
}
+func (m *MessageMetadata) GetOrderingKey() []byte {
+ if m != nil {
+ return m.OrderingKey
+ }
+ return nil
+}
+
+func (m *MessageMetadata) GetDeliverAtTime() int64 {
+ if m != nil && m.DeliverAtTime != nil {
+ return *m.DeliverAtTime
+ }
+ return 0
+}
+
+func (m *MessageMetadata) GetMarkerType() int32 {
+ if m != nil && m.MarkerType != nil {
+ return *m.MarkerType
+ }
+ return 0
+}
+
type SingleMessageMetadata struct {
Properties []*KeyValue `protobuf:"bytes,1,rep,name=properties" json:"properties,omitempty"`
PartitionKey *string `protobuf:"bytes,2,opt,name=partition_key,json=partitionKey" json:"partition_key,omitempty"`
@@ -1170,18 +1250,20 @@
CompactedOut *bool `protobuf:"varint,4,opt,name=compacted_out,json=compactedOut,def=0" json:"compacted_out,omitempty"`
// the timestamp that this event occurs. it is typically set by applications.
// if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
- EventTime *uint64 `protobuf:"varint,5,opt,name=event_time,json=eventTime,def=0" json:"event_time,omitempty"`
- PartitionKeyB64Encoded *bool `protobuf:"varint,6,opt,name=partition_key_b64_encoded,json=partitionKeyB64Encoded,def=0" json:"partition_key_b64_encoded,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_unrecognized []byte `json:"-"`
- XXX_sizecache int32 `json:"-"`
+ EventTime *uint64 `protobuf:"varint,5,opt,name=event_time,json=eventTime,def=0" json:"event_time,omitempty"`
+ PartitionKeyB64Encoded *bool `protobuf:"varint,6,opt,name=partition_key_b64_encoded,json=partitionKeyB64Encoded,def=0" json:"partition_key_b64_encoded,omitempty"`
+ // Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode.
+ OrderingKey []byte `protobuf:"bytes,7,opt,name=ordering_key,json=orderingKey" json:"ordering_key,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
}
func (m *SingleMessageMetadata) Reset() { *m = SingleMessageMetadata{} }
func (m *SingleMessageMetadata) String() string { return proto.CompactTextString(m) }
func (*SingleMessageMetadata) ProtoMessage() {}
func (*SingleMessageMetadata) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{6}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{6}
}
func (m *SingleMessageMetadata) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SingleMessageMetadata.Unmarshal(m, b)
@@ -1247,6 +1329,13 @@
return Default_SingleMessageMetadata_PartitionKeyB64Encoded
}
+func (m *SingleMessageMetadata) GetOrderingKey() []byte {
+ if m != nil {
+ return m.OrderingKey
+ }
+ return nil
+}
+
type CommandConnect struct {
ClientVersion *string `protobuf:"bytes,1,req,name=client_version,json=clientVersion" json:"client_version,omitempty"`
AuthMethod *AuthMethod `protobuf:"varint,2,opt,name=auth_method,json=authMethod,enum=pulsar.proto.AuthMethod" json:"auth_method,omitempty"`
@@ -1274,7 +1363,7 @@
func (m *CommandConnect) String() string { return proto.CompactTextString(m) }
func (*CommandConnect) ProtoMessage() {}
func (*CommandConnect) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{7}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{7}
}
func (m *CommandConnect) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandConnect.Unmarshal(m, b)
@@ -1362,6 +1451,7 @@
type CommandConnected struct {
ServerVersion *string `protobuf:"bytes,1,req,name=server_version,json=serverVersion" json:"server_version,omitempty"`
ProtocolVersion *int32 `protobuf:"varint,2,opt,name=protocol_version,json=protocolVersion,def=0" json:"protocol_version,omitempty"`
+ MaxMessageSize *int32 `protobuf:"varint,3,opt,name=max_message_size,json=maxMessageSize" json:"max_message_size,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@@ -1371,7 +1461,7 @@
func (m *CommandConnected) String() string { return proto.CompactTextString(m) }
func (*CommandConnected) ProtoMessage() {}
func (*CommandConnected) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{8}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{8}
}
func (m *CommandConnected) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandConnected.Unmarshal(m, b)
@@ -1407,6 +1497,172 @@
return Default_CommandConnected_ProtocolVersion
}
+func (m *CommandConnected) GetMaxMessageSize() int32 {
+ if m != nil && m.MaxMessageSize != nil {
+ return *m.MaxMessageSize
+ }
+ return 0
+}
+
+type CommandAuthResponse struct {
+ ClientVersion *string `protobuf:"bytes,1,opt,name=client_version,json=clientVersion" json:"client_version,omitempty"`
+ Response *AuthData `protobuf:"bytes,2,opt,name=response" json:"response,omitempty"`
+ ProtocolVersion *int32 `protobuf:"varint,3,opt,name=protocol_version,json=protocolVersion,def=0" json:"protocol_version,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandAuthResponse) Reset() { *m = CommandAuthResponse{} }
+func (m *CommandAuthResponse) String() string { return proto.CompactTextString(m) }
+func (*CommandAuthResponse) ProtoMessage() {}
+func (*CommandAuthResponse) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{9}
+}
+func (m *CommandAuthResponse) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandAuthResponse.Unmarshal(m, b)
+}
+func (m *CommandAuthResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandAuthResponse.Marshal(b, m, deterministic)
+}
+func (dst *CommandAuthResponse) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandAuthResponse.Merge(dst, src)
+}
+func (m *CommandAuthResponse) XXX_Size() int {
+ return xxx_messageInfo_CommandAuthResponse.Size(m)
+}
+func (m *CommandAuthResponse) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandAuthResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandAuthResponse proto.InternalMessageInfo
+
+const Default_CommandAuthResponse_ProtocolVersion int32 = 0
+
+func (m *CommandAuthResponse) GetClientVersion() string {
+ if m != nil && m.ClientVersion != nil {
+ return *m.ClientVersion
+ }
+ return ""
+}
+
+func (m *CommandAuthResponse) GetResponse() *AuthData {
+ if m != nil {
+ return m.Response
+ }
+ return nil
+}
+
+func (m *CommandAuthResponse) GetProtocolVersion() int32 {
+ if m != nil && m.ProtocolVersion != nil {
+ return *m.ProtocolVersion
+ }
+ return Default_CommandAuthResponse_ProtocolVersion
+}
+
+type CommandAuthChallenge struct {
+ ServerVersion *string `protobuf:"bytes,1,opt,name=server_version,json=serverVersion" json:"server_version,omitempty"`
+ Challenge *AuthData `protobuf:"bytes,2,opt,name=challenge" json:"challenge,omitempty"`
+ ProtocolVersion *int32 `protobuf:"varint,3,opt,name=protocol_version,json=protocolVersion,def=0" json:"protocol_version,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandAuthChallenge) Reset() { *m = CommandAuthChallenge{} }
+func (m *CommandAuthChallenge) String() string { return proto.CompactTextString(m) }
+func (*CommandAuthChallenge) ProtoMessage() {}
+func (*CommandAuthChallenge) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{10}
+}
+func (m *CommandAuthChallenge) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandAuthChallenge.Unmarshal(m, b)
+}
+func (m *CommandAuthChallenge) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandAuthChallenge.Marshal(b, m, deterministic)
+}
+func (dst *CommandAuthChallenge) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandAuthChallenge.Merge(dst, src)
+}
+func (m *CommandAuthChallenge) XXX_Size() int {
+ return xxx_messageInfo_CommandAuthChallenge.Size(m)
+}
+func (m *CommandAuthChallenge) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandAuthChallenge.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandAuthChallenge proto.InternalMessageInfo
+
+const Default_CommandAuthChallenge_ProtocolVersion int32 = 0
+
+func (m *CommandAuthChallenge) GetServerVersion() string {
+ if m != nil && m.ServerVersion != nil {
+ return *m.ServerVersion
+ }
+ return ""
+}
+
+func (m *CommandAuthChallenge) GetChallenge() *AuthData {
+ if m != nil {
+ return m.Challenge
+ }
+ return nil
+}
+
+func (m *CommandAuthChallenge) GetProtocolVersion() int32 {
+ if m != nil && m.ProtocolVersion != nil {
+ return *m.ProtocolVersion
+ }
+ return Default_CommandAuthChallenge_ProtocolVersion
+}
+
+// To support mutual authentication type, such as Sasl, reuse this command to mutual auth.
+type AuthData struct {
+ AuthMethodName *string `protobuf:"bytes,1,opt,name=auth_method_name,json=authMethodName" json:"auth_method_name,omitempty"`
+ AuthData []byte `protobuf:"bytes,2,opt,name=auth_data,json=authData" json:"auth_data,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *AuthData) Reset() { *m = AuthData{} }
+func (m *AuthData) String() string { return proto.CompactTextString(m) }
+func (*AuthData) ProtoMessage() {}
+func (*AuthData) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{11}
+}
+func (m *AuthData) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_AuthData.Unmarshal(m, b)
+}
+func (m *AuthData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_AuthData.Marshal(b, m, deterministic)
+}
+func (dst *AuthData) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_AuthData.Merge(dst, src)
+}
+func (m *AuthData) XXX_Size() int {
+ return xxx_messageInfo_AuthData.Size(m)
+}
+func (m *AuthData) XXX_DiscardUnknown() {
+ xxx_messageInfo_AuthData.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_AuthData proto.InternalMessageInfo
+
+func (m *AuthData) GetAuthMethodName() string {
+ if m != nil && m.AuthMethodName != nil {
+ return *m.AuthMethodName
+ }
+ return ""
+}
+
+func (m *AuthData) GetAuthData() []byte {
+ if m != nil {
+ return m.AuthData
+ }
+ return nil
+}
+
type CommandSubscribe struct {
Topic *string `protobuf:"bytes,1,req,name=topic" json:"topic,omitempty"`
Subscription *string `protobuf:"bytes,2,req,name=subscription" json:"subscription,omitempty"`
@@ -1428,17 +1684,21 @@
Schema *Schema `protobuf:"bytes,12,opt,name=schema" json:"schema,omitempty"`
// Signal wthether the subscription will initialize on latest
// or not -- earliest
- InitialPosition *CommandSubscribe_InitialPosition `protobuf:"varint,13,opt,name=initialPosition,enum=pulsar.proto.CommandSubscribe_InitialPosition,def=0" json:"initialPosition,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_unrecognized []byte `json:"-"`
- XXX_sizecache int32 `json:"-"`
+ InitialPosition *CommandSubscribe_InitialPosition `protobuf:"varint,13,opt,name=initialPosition,enum=pulsar.proto.CommandSubscribe_InitialPosition,def=0" json:"initialPosition,omitempty"`
+ // Mark the subscription as "replicated". Pulsar will make sure
+ // to periodically sync the state of replicated subscriptions
+ // across different clusters (when using geo-replication).
+ ReplicateSubscriptionState *bool `protobuf:"varint,14,opt,name=replicate_subscription_state,json=replicateSubscriptionState" json:"replicate_subscription_state,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
}
func (m *CommandSubscribe) Reset() { *m = CommandSubscribe{} }
func (m *CommandSubscribe) String() string { return proto.CompactTextString(m) }
func (*CommandSubscribe) ProtoMessage() {}
func (*CommandSubscribe) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{9}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{12}
}
func (m *CommandSubscribe) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandSubscribe.Unmarshal(m, b)
@@ -1552,6 +1812,13 @@
return Default_CommandSubscribe_InitialPosition
}
+func (m *CommandSubscribe) GetReplicateSubscriptionState() bool {
+ if m != nil && m.ReplicateSubscriptionState != nil {
+ return *m.ReplicateSubscriptionState
+ }
+ return false
+}
+
type CommandPartitionedTopicMetadata struct {
Topic *string `protobuf:"bytes,1,req,name=topic" json:"topic,omitempty"`
RequestId *uint64 `protobuf:"varint,2,req,name=request_id,json=requestId" json:"request_id,omitempty"`
@@ -1572,7 +1839,7 @@
func (m *CommandPartitionedTopicMetadata) String() string { return proto.CompactTextString(m) }
func (*CommandPartitionedTopicMetadata) ProtoMessage() {}
func (*CommandPartitionedTopicMetadata) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{10}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{13}
}
func (m *CommandPartitionedTopicMetadata) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandPartitionedTopicMetadata.Unmarshal(m, b)
@@ -1644,7 +1911,7 @@
func (m *CommandPartitionedTopicMetadataResponse) String() string { return proto.CompactTextString(m) }
func (*CommandPartitionedTopicMetadataResponse) ProtoMessage() {}
func (*CommandPartitionedTopicMetadataResponse) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{11}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{14}
}
func (m *CommandPartitionedTopicMetadataResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandPartitionedTopicMetadataResponse.Unmarshal(m, b)
@@ -1720,7 +1987,7 @@
func (m *CommandLookupTopic) String() string { return proto.CompactTextString(m) }
func (*CommandLookupTopic) ProtoMessage() {}
func (*CommandLookupTopic) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{12}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{15}
}
func (m *CommandLookupTopic) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandLookupTopic.Unmarshal(m, b)
@@ -1805,7 +2072,7 @@
func (m *CommandLookupTopicResponse) String() string { return proto.CompactTextString(m) }
func (*CommandLookupTopicResponse) ProtoMessage() {}
func (*CommandLookupTopicResponse) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{13}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{16}
}
func (m *CommandLookupTopicResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandLookupTopicResponse.Unmarshal(m, b)
@@ -1906,7 +2173,7 @@
func (m *CommandProducer) String() string { return proto.CompactTextString(m) }
func (*CommandProducer) ProtoMessage() {}
func (*CommandProducer) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{14}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{17}
}
func (m *CommandProducer) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandProducer.Unmarshal(m, b)
@@ -1990,7 +2257,7 @@
func (m *CommandSend) String() string { return proto.CompactTextString(m) }
func (*CommandSend) ProtoMessage() {}
func (*CommandSend) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{15}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{18}
}
func (m *CommandSend) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandSend.Unmarshal(m, b)
@@ -2046,7 +2313,7 @@
func (m *CommandSendReceipt) String() string { return proto.CompactTextString(m) }
func (*CommandSendReceipt) ProtoMessage() {}
func (*CommandSendReceipt) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{16}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{19}
}
func (m *CommandSendReceipt) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandSendReceipt.Unmarshal(m, b)
@@ -2101,7 +2368,7 @@
func (m *CommandSendError) String() string { return proto.CompactTextString(m) }
func (*CommandSendError) ProtoMessage() {}
func (*CommandSendError) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{17}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{20}
}
func (m *CommandSendError) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandSendError.Unmarshal(m, b)
@@ -2162,7 +2429,7 @@
func (m *CommandMessage) String() string { return proto.CompactTextString(m) }
func (*CommandMessage) ProtoMessage() {}
func (*CommandMessage) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{18}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{21}
}
func (m *CommandMessage) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandMessage.Unmarshal(m, b)
@@ -2221,7 +2488,7 @@
func (m *CommandAck) String() string { return proto.CompactTextString(m) }
func (*CommandAck) ProtoMessage() {}
func (*CommandAck) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{19}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{22}
}
func (m *CommandAck) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandAck.Unmarshal(m, b)
@@ -2289,7 +2556,7 @@
func (m *CommandActiveConsumerChange) String() string { return proto.CompactTextString(m) }
func (*CommandActiveConsumerChange) ProtoMessage() {}
func (*CommandActiveConsumerChange) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{20}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{23}
}
func (m *CommandActiveConsumerChange) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandActiveConsumerChange.Unmarshal(m, b)
@@ -2339,7 +2606,7 @@
func (m *CommandFlow) String() string { return proto.CompactTextString(m) }
func (*CommandFlow) ProtoMessage() {}
func (*CommandFlow) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{21}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{24}
}
func (m *CommandFlow) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandFlow.Unmarshal(m, b)
@@ -2385,7 +2652,7 @@
func (m *CommandUnsubscribe) String() string { return proto.CompactTextString(m) }
func (*CommandUnsubscribe) ProtoMessage() {}
func (*CommandUnsubscribe) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{22}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{25}
}
func (m *CommandUnsubscribe) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandUnsubscribe.Unmarshal(m, b)
@@ -2424,6 +2691,7 @@
ConsumerId *uint64 `protobuf:"varint,1,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
RequestId *uint64 `protobuf:"varint,2,req,name=request_id,json=requestId" json:"request_id,omitempty"`
MessageId *MessageIdData `protobuf:"bytes,3,opt,name=message_id,json=messageId" json:"message_id,omitempty"`
+ MessagePublishTime *uint64 `protobuf:"varint,4,opt,name=message_publish_time,json=messagePublishTime" json:"message_publish_time,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@@ -2433,7 +2701,7 @@
func (m *CommandSeek) String() string { return proto.CompactTextString(m) }
func (*CommandSeek) ProtoMessage() {}
func (*CommandSeek) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{23}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{26}
}
func (m *CommandSeek) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandSeek.Unmarshal(m, b)
@@ -2474,6 +2742,13 @@
return nil
}
+func (m *CommandSeek) GetMessagePublishTime() uint64 {
+ if m != nil && m.MessagePublishTime != nil {
+ return *m.MessagePublishTime
+ }
+ return 0
+}
+
// Message sent by broker to client when a topic
// has been forcefully terminated and there are no more
// messages left to consume
@@ -2488,7 +2763,7 @@
func (m *CommandReachedEndOfTopic) String() string { return proto.CompactTextString(m) }
func (*CommandReachedEndOfTopic) ProtoMessage() {}
func (*CommandReachedEndOfTopic) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{24}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{27}
}
func (m *CommandReachedEndOfTopic) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandReachedEndOfTopic.Unmarshal(m, b)
@@ -2527,7 +2802,7 @@
func (m *CommandCloseProducer) String() string { return proto.CompactTextString(m) }
func (*CommandCloseProducer) ProtoMessage() {}
func (*CommandCloseProducer) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{25}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{28}
}
func (m *CommandCloseProducer) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandCloseProducer.Unmarshal(m, b)
@@ -2573,7 +2848,7 @@
func (m *CommandCloseConsumer) String() string { return proto.CompactTextString(m) }
func (*CommandCloseConsumer) ProtoMessage() {}
func (*CommandCloseConsumer) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{26}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{29}
}
func (m *CommandCloseConsumer) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandCloseConsumer.Unmarshal(m, b)
@@ -2621,7 +2896,7 @@
func (m *CommandRedeliverUnacknowledgedMessages) String() string { return proto.CompactTextString(m) }
func (*CommandRedeliverUnacknowledgedMessages) ProtoMessage() {}
func (*CommandRedeliverUnacknowledgedMessages) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{27}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{30}
}
func (m *CommandRedeliverUnacknowledgedMessages) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandRedeliverUnacknowledgedMessages.Unmarshal(m, b)
@@ -2667,7 +2942,7 @@
func (m *CommandSuccess) String() string { return proto.CompactTextString(m) }
func (*CommandSuccess) ProtoMessage() {}
func (*CommandSuccess) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{28}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{31}
}
func (m *CommandSuccess) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandSuccess.Unmarshal(m, b)
@@ -2718,7 +2993,7 @@
func (m *CommandProducerSuccess) String() string { return proto.CompactTextString(m) }
func (*CommandProducerSuccess) ProtoMessage() {}
func (*CommandProducerSuccess) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{29}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{32}
}
func (m *CommandProducerSuccess) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandProducerSuccess.Unmarshal(m, b)
@@ -2781,7 +3056,7 @@
func (m *CommandError) String() string { return proto.CompactTextString(m) }
func (*CommandError) ProtoMessage() {}
func (*CommandError) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{30}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{33}
}
func (m *CommandError) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandError.Unmarshal(m, b)
@@ -2835,7 +3110,7 @@
func (m *CommandPing) String() string { return proto.CompactTextString(m) }
func (*CommandPing) ProtoMessage() {}
func (*CommandPing) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{31}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{34}
}
func (m *CommandPing) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandPing.Unmarshal(m, b)
@@ -2865,7 +3140,7 @@
func (m *CommandPong) String() string { return proto.CompactTextString(m) }
func (*CommandPong) ProtoMessage() {}
func (*CommandPong) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{32}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{35}
}
func (m *CommandPong) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandPong.Unmarshal(m, b)
@@ -2899,7 +3174,7 @@
func (m *CommandConsumerStats) String() string { return proto.CompactTextString(m) }
func (*CommandConsumerStats) ProtoMessage() {}
func (*CommandConsumerStats) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{33}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{36}
}
func (m *CommandConsumerStats) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandConsumerStats.Unmarshal(m, b)
@@ -2970,7 +3245,7 @@
func (m *CommandConsumerStatsResponse) String() string { return proto.CompactTextString(m) }
func (*CommandConsumerStatsResponse) ProtoMessage() {}
func (*CommandConsumerStatsResponse) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{34}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{37}
}
func (m *CommandConsumerStatsResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandConsumerStatsResponse.Unmarshal(m, b)
@@ -3107,7 +3382,7 @@
func (m *CommandGetLastMessageId) String() string { return proto.CompactTextString(m) }
func (*CommandGetLastMessageId) ProtoMessage() {}
func (*CommandGetLastMessageId) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{35}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{38}
}
func (m *CommandGetLastMessageId) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandGetLastMessageId.Unmarshal(m, b)
@@ -3153,7 +3428,7 @@
func (m *CommandGetLastMessageIdResponse) String() string { return proto.CompactTextString(m) }
func (*CommandGetLastMessageIdResponse) ProtoMessage() {}
func (*CommandGetLastMessageIdResponse) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{36}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{39}
}
func (m *CommandGetLastMessageIdResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandGetLastMessageIdResponse.Unmarshal(m, b)
@@ -3200,7 +3475,7 @@
func (m *CommandGetTopicsOfNamespace) String() string { return proto.CompactTextString(m) }
func (*CommandGetTopicsOfNamespace) ProtoMessage() {}
func (*CommandGetTopicsOfNamespace) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{37}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{40}
}
func (m *CommandGetTopicsOfNamespace) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandGetTopicsOfNamespace.Unmarshal(m, b)
@@ -3255,7 +3530,7 @@
func (m *CommandGetTopicsOfNamespaceResponse) String() string { return proto.CompactTextString(m) }
func (*CommandGetTopicsOfNamespaceResponse) ProtoMessage() {}
func (*CommandGetTopicsOfNamespaceResponse) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{38}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{41}
}
func (m *CommandGetTopicsOfNamespaceResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandGetTopicsOfNamespaceResponse.Unmarshal(m, b)
@@ -3302,7 +3577,7 @@
func (m *CommandGetSchema) String() string { return proto.CompactTextString(m) }
func (*CommandGetSchema) ProtoMessage() {}
func (*CommandGetSchema) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{39}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{42}
}
func (m *CommandGetSchema) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandGetSchema.Unmarshal(m, b)
@@ -3358,7 +3633,7 @@
func (m *CommandGetSchemaResponse) String() string { return proto.CompactTextString(m) }
func (*CommandGetSchemaResponse) ProtoMessage() {}
func (*CommandGetSchemaResponse) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{40}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{43}
}
func (m *CommandGetSchemaResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CommandGetSchemaResponse.Unmarshal(m, b)
@@ -3449,6 +3724,8 @@
GetTopicsOfNamespaceResponse *CommandGetTopicsOfNamespaceResponse `protobuf:"bytes,33,opt,name=getTopicsOfNamespaceResponse" json:"getTopicsOfNamespaceResponse,omitempty"`
GetSchema *CommandGetSchema `protobuf:"bytes,34,opt,name=getSchema" json:"getSchema,omitempty"`
GetSchemaResponse *CommandGetSchemaResponse `protobuf:"bytes,35,opt,name=getSchemaResponse" json:"getSchemaResponse,omitempty"`
+ AuthChallenge *CommandAuthChallenge `protobuf:"bytes,36,opt,name=authChallenge" json:"authChallenge,omitempty"`
+ AuthResponse *CommandAuthResponse `protobuf:"bytes,37,opt,name=authResponse" json:"authResponse,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@@ -3458,7 +3735,7 @@
func (m *BaseCommand) String() string { return proto.CompactTextString(m) }
func (*BaseCommand) ProtoMessage() {}
func (*BaseCommand) Descriptor() ([]byte, []int) {
- return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{41}
+ return fileDescriptor_PulsarApi_976de857822f53b5, []int{44}
}
func (m *BaseCommand) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_BaseCommand.Unmarshal(m, b)
@@ -3723,6 +4000,20 @@
return nil
}
+func (m *BaseCommand) GetAuthChallenge() *CommandAuthChallenge {
+ if m != nil {
+ return m.AuthChallenge
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetAuthResponse() *CommandAuthResponse {
+ if m != nil {
+ return m.AuthResponse
+ }
+ return nil
+}
+
func init() {
proto.RegisterType((*Schema)(nil), "pulsar.proto.Schema")
proto.RegisterType((*MessageIdData)(nil), "pulsar.proto.MessageIdData")
@@ -3733,6 +4024,9 @@
proto.RegisterType((*SingleMessageMetadata)(nil), "pulsar.proto.SingleMessageMetadata")
proto.RegisterType((*CommandConnect)(nil), "pulsar.proto.CommandConnect")
proto.RegisterType((*CommandConnected)(nil), "pulsar.proto.CommandConnected")
+ proto.RegisterType((*CommandAuthResponse)(nil), "pulsar.proto.CommandAuthResponse")
+ proto.RegisterType((*CommandAuthChallenge)(nil), "pulsar.proto.CommandAuthChallenge")
+ proto.RegisterType((*AuthData)(nil), "pulsar.proto.AuthData")
proto.RegisterType((*CommandSubscribe)(nil), "pulsar.proto.CommandSubscribe")
proto.RegisterType((*CommandPartitionedTopicMetadata)(nil), "pulsar.proto.CommandPartitionedTopicMetadata")
proto.RegisterType((*CommandPartitionedTopicMetadataResponse)(nil), "pulsar.proto.CommandPartitionedTopicMetadataResponse")
@@ -3781,263 +4075,285 @@
proto.RegisterEnum("pulsar.proto.BaseCommand_Type", BaseCommand_Type_name, BaseCommand_Type_value)
}
-func init() { proto.RegisterFile("PulsarApi.proto", fileDescriptor_PulsarApi_9ff3bd5e091a4809) }
+func init() { proto.RegisterFile("PulsarApi.proto", fileDescriptor_PulsarApi_976de857822f53b5) }
-var fileDescriptor_PulsarApi_9ff3bd5e091a4809 = []byte{
- // 4065 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x3b, 0x4d, 0x6f, 0x1b, 0x49,
- 0x76, 0x6e, 0x7e, 0x48, 0xe4, 0xe3, 0x57, 0xb9, 0x2c, 0x6b, 0xda, 0x1f, 0x63, 0xd3, 0xed, 0xb5,
- 0x47, 0xe3, 0x1d, 0x2b, 0xb6, 0xec, 0x75, 0x66, 0xbc, 0x9b, 0x60, 0x28, 0xaa, 0x6d, 0x33, 0x96,
- 0x48, 0x6d, 0x91, 0xf2, 0x62, 0x27, 0xbb, 0xe8, 0x6d, 0x75, 0x97, 0xa9, 0x86, 0x9a, 0xdd, 0x4c,
- 0x7f, 0x68, 0xac, 0x39, 0xe4, 0x36, 0x08, 0x02, 0x04, 0x08, 0x90, 0x1c, 0x73, 0xc8, 0x21, 0x08,
- 0x72, 0xce, 0x2d, 0x40, 0x7e, 0x43, 0x6e, 0xb9, 0xe4, 0x94, 0x4b, 0x72, 0x0c, 0x90, 0x43, 0x90,
- 0x73, 0x50, 0xd5, 0xd5, 0x5f, 0x24, 0x45, 0xca, 0x3b, 0x7b, 0xc8, 0x89, 0xdd, 0xaf, 0x5e, 0xbd,
- 0x7a, 0xf5, 0xde, 0xab, 0xf7, 0x55, 0x4d, 0x68, 0x1d, 0x86, 0xb6, 0xaf, 0x7b, 0x9d, 0xa9, 0xb5,
- 0x3d, 0xf5, 0xdc, 0xc0, 0xc5, 0xf5, 0x29, 0x07, 0x44, 0x6f, 0xca, 0x7f, 0x48, 0xb0, 0x36, 0x34,
- 0x4e, 0xe8, 0x44, 0xc7, 0x18, 0x4a, 0x8e, 0x3e, 0xa1, 0xb2, 0xd4, 0x2e, 0x6c, 0x55, 0x09, 0x7f,
- 0xc6, 0x77, 0xa1, 0xe6, 0xf3, 0x51, 0xcd, 0xd4, 0x03, 0x5d, 0x2e, 0xb6, 0x0b, 0x5b, 0x75, 0x02,
- 0x11, 0x68, 0x4f, 0x0f, 0x74, 0xfc, 0x18, 0x4a, 0xc1, 0xf9, 0x94, 0xca, 0xa5, 0x76, 0x61, 0xab,
- 0xb9, 0x73, 0x63, 0x3b, 0x4b, 0x7c, 0x3b, 0x22, 0xbc, 0x3d, 0x3a, 0x9f, 0x52, 0xc2, 0xd1, 0xf0,
- 0x0b, 0x80, 0xa9, 0xe7, 0x4e, 0xa9, 0x17, 0x58, 0xd4, 0x97, 0xcb, 0xed, 0xe2, 0x56, 0x6d, 0x67,
- 0x33, 0x3f, 0xe9, 0x2d, 0x3d, 0x7f, 0xa7, 0xdb, 0x21, 0x25, 0x19, 0x4c, 0xe5, 0x0f, 0xa1, 0xc4,
- 0xa8, 0xe0, 0x0a, 0x94, 0xfa, 0xae, 0x43, 0xd1, 0x15, 0x0c, 0xb0, 0x36, 0x0c, 0x3c, 0xcb, 0x19,
- 0x23, 0x89, 0x41, 0xff, 0xc8, 0x77, 0x1d, 0x54, 0xc0, 0x75, 0xa8, 0x1c, 0x32, 0x2a, 0xc7, 0xe1,
- 0x7b, 0x54, 0x64, 0xf0, 0xce, 0x99, 0xe7, 0xa2, 0x92, 0xf2, 0x17, 0x12, 0x34, 0x0e, 0xa8, 0xef,
- 0xeb, 0x63, 0xda, 0x33, 0x39, 0xe3, 0x37, 0xa1, 0x62, 0x53, 0x73, 0x4c, 0xbd, 0x9e, 0xc9, 0x77,
- 0x5c, 0x22, 0xc9, 0x3b, 0x96, 0x61, 0x9d, 0x3a, 0x81, 0x77, 0xde, 0x33, 0xe5, 0x02, 0x1f, 0x8a,
- 0x5f, 0x71, 0x1b, 0xaa, 0x53, 0xdd, 0x0b, 0xac, 0xc0, 0x72, 0x1d, 0xb9, 0xd8, 0x96, 0xb6, 0xca,
- 0x2f, 0x0b, 0x8f, 0x9f, 0x92, 0x14, 0x88, 0xef, 0x43, 0xed, 0x58, 0x0f, 0x8c, 0x13, 0xcd, 0x72,
- 0x4c, 0xfa, 0x41, 0x2e, 0x25, 0x38, 0xc0, 0xc1, 0x3d, 0x06, 0x55, 0x76, 0xa0, 0x12, 0x6f, 0x13,
- 0x23, 0x28, 0x9e, 0xd2, 0x73, 0x21, 0x75, 0xf6, 0x88, 0x37, 0xa0, 0x7c, 0xc6, 0x86, 0xf8, 0xe2,
- 0x55, 0x12, 0xbd, 0x28, 0x2f, 0xa0, 0xfe, 0x96, 0x9e, 0xef, 0xbb, 0xce, 0xf8, 0x52, 0xf3, 0x4a,
- 0xf1, 0x3c, 0x1b, 0x9a, 0xaa, 0x63, 0x78, 0xe7, 0x53, 0xc6, 0xde, 0x5b, 0x7a, 0xee, 0xaf, 0x9a,
- 0x59, 0x17, 0x33, 0xf1, 0x0e, 0x54, 0x26, 0x34, 0xd0, 0x85, 0xe6, 0x97, 0xa9, 0x2a, 0xc1, 0x53,
- 0xfe, 0xb7, 0x0c, 0x2d, 0x21, 0xe8, 0x03, 0x01, 0xc3, 0xf7, 0xa1, 0x31, 0xf5, 0x5c, 0x33, 0x34,
- 0xa8, 0xa7, 0x65, 0x2c, 0xac, 0x1e, 0x03, 0xfb, 0xb1, 0xa5, 0xd1, 0x3f, 0x09, 0xa9, 0x63, 0x50,
- 0xcd, 0x8a, 0xe5, 0x0e, 0x31, 0xa8, 0x67, 0xe2, 0x7b, 0x50, 0x9f, 0x86, 0xc7, 0xb6, 0xe5, 0x9f,
- 0x68, 0x81, 0x35, 0xa1, 0xdc, 0x16, 0x4b, 0xa4, 0x26, 0x60, 0x23, 0x6b, 0x32, 0x6b, 0x5d, 0xa5,
- 0xcb, 0x5a, 0x17, 0xfe, 0x0c, 0x5a, 0x1e, 0x9d, 0xda, 0x96, 0xa1, 0x07, 0xd4, 0xd4, 0xde, 0x7b,
- 0xee, 0x44, 0x2e, 0xb7, 0xa5, 0xad, 0x2a, 0x69, 0xa6, 0xe0, 0x57, 0x9e, 0x3b, 0xe1, 0x3b, 0x89,
- 0x35, 0xad, 0x31, 0x19, 0xae, 0x71, 0xb4, 0x7a, 0x02, 0x7c, 0x4b, 0xcf, 0x19, 0xa3, 0xc9, 0x34,
- 0x2d, 0x70, 0xe5, 0xf5, 0x76, 0x71, 0xab, 0x4a, 0x6a, 0x09, 0x6c, 0xe4, 0x62, 0x15, 0x6a, 0x86,
- 0x3b, 0x99, 0x7a, 0xd4, 0xf7, 0x99, 0x21, 0x55, 0xda, 0xd2, 0x56, 0x73, 0xe7, 0xd3, 0x3c, 0xa7,
- 0xdd, 0x14, 0x81, 0x99, 0xfe, 0xcb, 0x52, 0x7f, 0xd0, 0x57, 0x49, 0x76, 0x1e, 0xde, 0x86, 0xab,
- 0xa1, 0x13, 0x03, 0xa8, 0xa9, 0xf9, 0xd6, 0x77, 0x54, 0xae, 0xb6, 0xa5, 0xad, 0xc6, 0x4b, 0xe9,
- 0x09, 0x41, 0xd9, 0xb1, 0xa1, 0xf5, 0x1d, 0xc5, 0xcf, 0xe1, 0xba, 0x13, 0x4e, 0xb4, 0x49, 0xa4,
- 0x1f, 0x5f, 0xb3, 0x1c, 0x8d, 0x1b, 0xa5, 0x5c, 0xe3, 0x56, 0x2a, 0x3d, 0x25, 0xd8, 0x09, 0x27,
- 0x42, 0x7d, 0x7e, 0xcf, 0xd9, 0x65, 0x83, 0xb8, 0x0d, 0x40, 0xcf, 0xa8, 0x13, 0x44, 0x62, 0xaf,
- 0xb7, 0xa5, 0xad, 0x12, 0x23, 0x5f, 0xe5, 0x40, 0x2e, 0x77, 0x15, 0x5a, 0x34, 0x31, 0x31, 0x26,
- 0x17, 0x5f, 0x6e, 0x70, 0xe1, 0xdf, 0xce, 0x6f, 0x29, 0x6f, 0x87, 0xa4, 0x49, 0xf3, 0x76, 0xf9,
- 0x59, 0x8e, 0x8c, 0x6e, 0x8f, 0x5d, 0xb9, 0x19, 0xa9, 0x21, 0x05, 0x77, 0xec, 0xb1, 0x8b, 0x3f,
- 0x07, 0x94, 0x41, 0x9c, 0xea, 0x9e, 0x3e, 0x91, 0x5b, 0x6d, 0x69, 0xab, 0x4e, 0x32, 0x04, 0x0e,
- 0x19, 0x18, 0x3f, 0x80, 0xa6, 0x70, 0x60, 0x67, 0xd4, 0xe3, 0xc2, 0x46, 0x1c, 0xb1, 0x11, 0x41,
- 0xdf, 0x45, 0x40, 0xfc, 0x35, 0xdc, 0xc8, 0x29, 0x56, 0x3b, 0x7e, 0xf1, 0x5c, 0xa3, 0x8e, 0xe1,
- 0x9a, 0xd4, 0x94, 0xaf, 0xb6, 0xa5, 0xad, 0xca, 0xcb, 0xf2, 0x7b, 0xdd, 0xf6, 0x29, 0xd9, 0xcc,
- 0xea, 0x7a, 0xf7, 0xc5, 0x73, 0x35, 0x42, 0x52, 0xfe, 0xa1, 0x00, 0xd7, 0x87, 0x96, 0x33, 0xb6,
- 0xe9, 0xac, 0xf9, 0xe7, 0xad, 0x52, 0xba, 0xb4, 0x55, 0xce, 0x19, 0x5b, 0x61, 0xb1, 0xb1, 0x4d,
- 0xf5, 0x73, 0xdb, 0xd5, 0x85, 0xf6, 0xd9, 0xa9, 0x28, 0x93, 0x9a, 0x80, 0x71, 0xad, 0x3f, 0x82,
- 0x06, 0xb3, 0x03, 0xdd, 0x60, 0xc6, 0xed, 0x86, 0x01, 0xf7, 0x49, 0xc9, 0x7e, 0xea, 0xc9, 0xd8,
- 0x20, 0x0c, 0x66, 0x74, 0x5d, 0x5e, 0xa0, 0xeb, 0xa5, 0x92, 0x5a, 0xbb, 0x8c, 0xa4, 0xfe, 0xbe,
- 0x08, 0xcd, 0xae, 0x3b, 0x99, 0xe8, 0x8e, 0xd9, 0x75, 0x1d, 0x87, 0x1a, 0x01, 0xd3, 0x92, 0x61,
- 0x5b, 0x6c, 0xdd, 0x58, 0x4b, 0x91, 0x8b, 0x68, 0x44, 0xd0, 0x58, 0x4b, 0x5f, 0x41, 0x4d, 0x0f,
- 0x83, 0x13, 0x6d, 0x42, 0x83, 0x13, 0xd7, 0xe4, 0xf2, 0x68, 0xee, 0xc8, 0x79, 0x51, 0x76, 0xc2,
- 0xe0, 0xe4, 0x80, 0x8f, 0x13, 0xd0, 0x93, 0x67, 0xbc, 0x05, 0x28, 0x33, 0x35, 0x72, 0x43, 0xe2,
- 0x8c, 0xa7, 0x58, 0xdc, 0x11, 0xdd, 0x82, 0x2a, 0xc7, 0x14, 0x6e, 0x8f, 0x19, 0x4b, 0x85, 0x01,
- 0x78, 0xd4, 0xf8, 0x02, 0x10, 0x5f, 0xc6, 0x70, 0xed, 0x84, 0xd5, 0xc8, 0xc5, 0x4b, 0x4f, 0x48,
- 0x2b, 0x1e, 0x8a, 0xf9, 0x7d, 0x0c, 0xd7, 0xa6, 0x9e, 0xfb, 0xe1, 0x5c, 0x0b, 0x5c, 0xed, 0xd8,
- 0x73, 0x4f, 0xa9, 0xa7, 0x85, 0x9e, 0x2d, 0x9c, 0x06, 0xe2, 0x43, 0x23, 0x77, 0x97, 0x0f, 0x1c,
- 0x79, 0x36, 0x7e, 0x0c, 0xd8, 0xf5, 0xac, 0xb1, 0xe5, 0xe8, 0xb6, 0x36, 0xf5, 0x2c, 0xc7, 0xb0,
- 0xa6, 0xba, 0x2d, 0xaf, 0x73, 0xec, 0xab, 0xf1, 0xc8, 0x61, 0x3c, 0x80, 0xbf, 0xc8, 0xa0, 0xa7,
- 0x1c, 0x57, 0x22, 0xe2, 0xf1, 0x48, 0x27, 0xe6, 0xfc, 0x09, 0x6c, 0xe4, 0xb1, 0x85, 0x10, 0xab,
- 0x1c, 0x1f, 0x67, 0xf1, 0x23, 0x61, 0x28, 0x63, 0x40, 0x79, 0x35, 0x51, 0x93, 0x1f, 0x27, 0xea,
- 0x9d, 0x51, 0x6f, 0x56, 0x51, 0x11, 0x34, 0xde, 0xf8, 0x22, 0x31, 0x15, 0x2e, 0x12, 0x93, 0xf2,
- 0x2f, 0xe5, 0x64, 0xa5, 0x61, 0x78, 0xec, 0x1b, 0x9e, 0x75, 0x4c, 0x59, 0x48, 0x0a, 0xdc, 0xa9,
- 0x65, 0x88, 0x05, 0xa2, 0x17, 0xac, 0x40, 0xdd, 0x8f, 0x50, 0xf8, 0x19, 0x17, 0x11, 0x32, 0x07,
- 0xc3, 0x5f, 0xc3, 0xba, 0x1f, 0x1e, 0x33, 0x9f, 0xc9, 0x4f, 0x43, 0x73, 0xe7, 0xe1, 0x9c, 0x63,
- 0xcd, 0x2d, 0xb5, 0x3d, 0x8c, 0xb0, 0x49, 0x3c, 0x8d, 0xc5, 0x22, 0xc3, 0x75, 0xfc, 0x70, 0x42,
- 0x3d, 0x16, 0x8b, 0x4a, 0x51, 0x2c, 0x8a, 0x41, 0x3d, 0x13, 0x7f, 0x0a, 0xe0, 0xb1, 0xc8, 0xe4,
- 0x07, 0x6c, 0xbc, 0xcc, 0xc7, 0xab, 0x02, 0xd2, 0x33, 0xd9, 0xc9, 0x4d, 0xe6, 0x73, 0x4b, 0x13,
- 0x61, 0x22, 0x06, 0x72, 0x3b, 0x7b, 0x00, 0xcd, 0xa9, 0x67, 0xb9, 0x9e, 0x15, 0x9c, 0x6b, 0x36,
- 0x3d, 0xa3, 0x91, 0xa6, 0xcb, 0xa4, 0x11, 0x43, 0xf7, 0x19, 0x10, 0xdf, 0x81, 0x75, 0x33, 0xf4,
- 0xf4, 0x63, 0x9b, 0x72, 0xd5, 0x56, 0x5e, 0x96, 0x02, 0x2f, 0xa4, 0x24, 0x06, 0x62, 0x15, 0x90,
- 0x1f, 0xe8, 0x5e, 0x10, 0x7b, 0x75, 0xc6, 0x10, 0xd3, 0x69, 0x6d, 0xe7, 0x56, 0x7e, 0xdb, 0xb9,
- 0xf4, 0x87, 0x34, 0xf9, 0xa4, 0x04, 0x96, 0x8b, 0xf5, 0x70, 0xb9, 0x58, 0xcf, 0x76, 0xe0, 0x51,
- 0xdd, 0xd4, 0x12, 0x0f, 0xc2, 0xe3, 0x48, 0x85, 0x34, 0x18, 0xb4, 0x1b, 0x03, 0xf1, 0x17, 0xb0,
- 0x16, 0x39, 0x5b, 0x1e, 0x3b, 0x6a, 0x3b, 0x1b, 0x8b, 0x92, 0x44, 0x22, 0x70, 0xf0, 0x6f, 0xa0,
- 0x65, 0x39, 0x56, 0x60, 0xe9, 0xf6, 0xa1, 0xeb, 0x47, 0x79, 0x56, 0x83, 0x9f, 0xf3, 0xed, 0x15,
- 0x5a, 0xec, 0xe5, 0x67, 0xbd, 0x5c, 0xdb, 0xd7, 0x03, 0xea, 0x07, 0x64, 0x96, 0x9c, 0xb2, 0x03,
- 0xeb, 0x42, 0xe3, 0xb8, 0x01, 0x55, 0xf5, 0x83, 0x61, 0x87, 0xbe, 0x75, 0x16, 0xe7, 0x94, 0x27,
- 0xba, 0x47, 0x4d, 0x24, 0xb1, 0x4c, 0xf2, 0x95, 0x6e, 0xd9, 0xee, 0x19, 0xf5, 0x50, 0x41, 0xf9,
- 0x31, 0xb4, 0x66, 0xe8, 0x33, 0xe4, 0x68, 0x05, 0x74, 0x85, 0x21, 0xab, 0xba, 0x67, 0x5b, 0xec,
- 0x4d, 0x52, 0xfe, 0x53, 0x82, 0xbb, 0x82, 0xbd, 0xc3, 0xd8, 0x05, 0x52, 0x73, 0xc4, 0x0c, 0x38,
- 0x09, 0x0a, 0x8b, 0xcd, 0x3b, 0x6f, 0x57, 0x85, 0x59, 0xbb, 0x5a, 0xec, 0x20, 0x8a, 0x1f, 0xe7,
- 0x20, 0x4a, 0x1f, 0xe9, 0x20, 0xca, 0x17, 0x3a, 0x88, 0x7f, 0x2a, 0xc0, 0x67, 0x2b, 0xf6, 0x49,
- 0xa8, 0x3f, 0x75, 0x1d, 0x9f, 0xe2, 0x3b, 0x00, 0x49, 0x38, 0x60, 0x41, 0x50, 0xda, 0x6a, 0x90,
- 0x0c, 0x64, 0xd5, 0xce, 0x7f, 0x05, 0x15, 0x4f, 0x90, 0xe2, 0xfb, 0x6d, 0xee, 0x7c, 0xbd, 0xd0,
- 0x1c, 0x56, 0xf1, 0xb1, 0xbd, 0xef, 0xba, 0xa7, 0xe1, 0x94, 0x1f, 0xf7, 0x84, 0x22, 0xfe, 0x3d,
- 0x28, 0x53, 0xcf, 0x73, 0x3d, 0x2e, 0x9b, 0xf9, 0x2a, 0x86, 0xbb, 0x36, 0x95, 0x21, 0x90, 0x08,
- 0x8f, 0x15, 0x08, 0xe2, 0xb8, 0x09, 0xf1, 0xc4, 0xaf, 0xca, 0x03, 0x80, 0x74, 0x09, 0x5c, 0x63,
- 0xa6, 0x66, 0x18, 0xd4, 0xf7, 0x23, 0xeb, 0x62, 0x16, 0xc5, 0xac, 0x4b, 0xf9, 0xbe, 0x00, 0x58,
- 0xb0, 0x2c, 0xd0, 0xb9, 0xfe, 0x7f, 0x2b, 0xab, 0xf8, 0x31, 0x34, 0x98, 0xbe, 0x98, 0xcf, 0xd0,
- 0x03, 0xeb, 0x2c, 0x12, 0x50, 0x12, 0x85, 0xf3, 0x63, 0x17, 0x98, 0x50, 0xe9, 0xe3, 0x4c, 0xa8,
- 0xfc, 0x91, 0x26, 0xb4, 0x76, 0xa1, 0x09, 0xfd, 0x5b, 0x11, 0x6e, 0xce, 0xcb, 0x21, 0xb1, 0x9a,
- 0x47, 0x80, 0xa2, 0xb8, 0xc9, 0x74, 0x60, 0x19, 0xf4, 0xc8, 0xb3, 0xb9, 0xed, 0x54, 0xc9, 0x1c,
- 0x1c, 0x3f, 0x81, 0x6b, 0xb3, 0xb0, 0x91, 0xed, 0x8b, 0xa4, 0x69, 0xd1, 0x10, 0x1e, 0xcc, 0x19,
- 0xd5, 0xb3, 0x85, 0x46, 0xb5, 0x80, 0xb3, 0xc5, 0x76, 0x94, 0x57, 0x54, 0x69, 0xa5, 0xa2, 0xca,
- 0x4b, 0x14, 0x95, 0xd8, 0xe4, 0xda, 0xc7, 0xdb, 0xe4, 0x7a, 0xce, 0x26, 0x79, 0xca, 0x16, 0xa5,
- 0x21, 0x27, 0x9e, 0x1b, 0x8e, 0x4f, 0x34, 0x3f, 0x12, 0x03, 0x4f, 0x46, 0x2a, 0xf9, 0x94, 0x8d,
- 0xe7, 0x24, 0x11, 0x5a, 0x2a, 0x2c, 0xe5, 0x59, 0xce, 0xaa, 0xeb, 0x50, 0x21, 0xd4, 0xb4, 0x3c,
- 0x6a, 0x30, 0xdf, 0x57, 0x83, 0x75, 0x91, 0x1f, 0x20, 0x29, 0x63, 0xe3, 0x05, 0xe5, 0xaf, 0x0b,
- 0xd0, 0x8a, 0x8f, 0xa5, 0xa8, 0xf4, 0x2e, 0x30, 0xf0, 0xbb, 0x50, 0x4b, 0x0a, 0xc4, 0xb4, 0xf6,
- 0x8b, 0x41, 0x73, 0xf1, 0xb6, 0xb8, 0x20, 0xde, 0xe6, 0x0b, 0xcc, 0x92, 0xc8, 0x94, 0xb3, 0x05,
- 0xe6, 0x7d, 0xa8, 0x8a, 0xe2, 0x80, 0x9a, 0x79, 0xc9, 0xa7, 0xf0, 0x5c, 0x18, 0x5c, 0xbb, 0x64,
- 0x18, 0x4c, 0xe3, 0xdb, 0xfa, 0xea, 0xf8, 0xa6, 0x84, 0x50, 0x8b, 0x43, 0x17, 0x75, 0xcc, 0xd9,
- 0xad, 0x4b, 0x73, 0x5b, 0x5f, 0x59, 0x17, 0xff, 0x08, 0xea, 0xd9, 0xa2, 0x4e, 0x74, 0x25, 0xa4,
- 0xa7, 0xa4, 0x96, 0xa9, 0xe5, 0x94, 0xbf, 0x92, 0x12, 0x87, 0xc3, 0xd6, 0x25, 0xd4, 0xa0, 0xd6,
- 0x34, 0xf8, 0x1d, 0x2c, 0xff, 0x12, 0x20, 0x93, 0x79, 0x14, 0x57, 0x67, 0x1e, 0xd5, 0x49, 0xfc,
- 0xaa, 0xfc, 0xad, 0x94, 0x26, 0x7e, 0xd4, 0x31, 0xb9, 0x39, 0xff, 0x0e, 0x58, 0x4a, 0x8e, 0x4e,
- 0x71, 0x61, 0x53, 0x6a, 0xe9, 0xd1, 0x29, 0x71, 0xbb, 0x4c, 0xdc, 0xf9, 0xdf, 0x48, 0x49, 0xad,
- 0x22, 0x76, 0x31, 0x9b, 0x1c, 0x4a, 0x73, 0xc9, 0x61, 0x5e, 0x22, 0x8c, 0xbd, 0x4b, 0x4b, 0x84,
- 0x25, 0xce, 0x1e, 0x35, 0xa9, 0x6d, 0x9d, 0x51, 0xef, 0x5c, 0x33, 0xdc, 0xd0, 0x09, 0xb8, 0x4c,
- 0x79, 0x41, 0xdf, 0x4a, 0x87, 0xba, 0x6c, 0x44, 0xf9, 0x9f, 0x22, 0x80, 0xe0, 0xae, 0x63, 0x9c,
- 0xae, 0xe6, 0xec, 0xa7, 0x50, 0xd1, 0x8d, 0x53, 0x8d, 0x37, 0xec, 0x0a, 0x5c, 0x36, 0xed, 0x85,
- 0x0e, 0xaf, 0x63, 0x9c, 0x6e, 0x77, 0x8c, 0xd3, 0x28, 0x29, 0xd6, 0xa3, 0x87, 0x39, 0x45, 0x17,
- 0x3f, 0x62, 0x5b, 0x43, 0x40, 0x67, 0xba, 0x6d, 0x99, 0x3a, 0xaf, 0x1a, 0xb3, 0xb1, 0x76, 0xeb,
- 0x42, 0x06, 0xde, 0x25, 0x13, 0x22, 0x5d, 0xb5, 0xce, 0xf2, 0x00, 0xc6, 0xd0, 0x5c, 0x2f, 0xf1,
- 0xe6, 0xdc, 0x69, 0x4d, 0x1a, 0x66, 0xb9, 0x7e, 0xe2, 0xe7, 0xb0, 0x2e, 0x36, 0x88, 0x9b, 0x00,
- 0x3d, 0xc7, 0xb4, 0xce, 0x2c, 0x33, 0xd4, 0x6d, 0x74, 0x85, 0xbd, 0x77, 0xc3, 0x49, 0x68, 0x73,
- 0x37, 0x8c, 0x24, 0xe5, 0x2f, 0x25, 0x68, 0xcd, 0xf0, 0x82, 0xef, 0xc0, 0xcd, 0xa3, 0x99, 0xe6,
- 0x4a, 0xd7, 0xf5, 0xbc, 0x90, 0x17, 0x20, 0xe8, 0x0a, 0xde, 0x04, 0xbc, 0x47, 0x33, 0x9d, 0x1a,
- 0x3e, 0x0b, 0x49, 0x78, 0x03, 0x50, 0xf7, 0x84, 0x1a, 0xa7, 0x7e, 0x38, 0x39, 0xb0, 0xfc, 0x89,
- 0x1e, 0x18, 0x27, 0xa8, 0x80, 0x6f, 0xc0, 0x75, 0xde, 0x69, 0xd9, 0xa3, 0x43, 0xea, 0x59, 0xba,
- 0x6d, 0x7d, 0x47, 0xa3, 0x09, 0x45, 0x7c, 0x0d, 0x5a, 0x7b, 0x34, 0xee, 0x68, 0x44, 0xc0, 0x92,
- 0x72, 0x0c, 0xb7, 0x12, 0x39, 0x31, 0x26, 0xbb, 0x42, 0xc3, 0xdd, 0x13, 0xdd, 0xb9, 0x8c, 0x81,
- 0x2a, 0x50, 0xb5, 0x7c, 0x4d, 0xe7, 0x73, 0x79, 0x7c, 0x4c, 0x3c, 0x61, 0xc5, 0xf2, 0x23, 0x92,
- 0xca, 0xbb, 0xc4, 0x4d, 0xbd, 0xb2, 0xdd, 0x6f, 0x57, 0xd3, 0x7c, 0x08, 0x4d, 0xa1, 0xee, 0x43,
- 0xea, 0x4d, 0xac, 0xc0, 0xe7, 0x06, 0xd6, 0x20, 0x33, 0x50, 0x65, 0x94, 0xb8, 0xa1, 0x23, 0xc7,
- 0x4f, 0x8a, 0xbd, 0x95, 0xe4, 0x97, 0xa7, 0x40, 0xca, 0x9f, 0x4b, 0x19, 0xaf, 0x4a, 0x4f, 0x7f,
- 0x28, 0xbd, 0x1f, 0xe4, 0xd4, 0x7e, 0x0a, 0xb2, 0x60, 0x85, 0x50, 0xdd, 0x38, 0xa1, 0xa6, 0xea,
- 0x98, 0x83, 0xf7, 0xa3, 0x38, 0xd0, 0x2d, 0xe5, 0x4b, 0x79, 0x07, 0x1b, 0x71, 0xcd, 0x6d, 0xbb,
- 0x3e, 0x4d, 0xe2, 0xe6, 0x4a, 0xa7, 0xb8, 0x42, 0x40, 0x33, 0x74, 0x63, 0x8b, 0xf9, 0xc1, 0x82,
- 0xff, 0x33, 0x09, 0x1e, 0x26, 0xbb, 0x15, 0xce, 0xe9, 0xc8, 0xd1, 0x8d, 0x53, 0xc7, 0xfd, 0x96,
- 0xb7, 0xd3, 0x63, 0xb7, 0xe9, 0xaf, 0x5e, 0xea, 0x67, 0x50, 0x4b, 0x85, 0xce, 0xec, 0x67, 0xa5,
- 0x87, 0x81, 0x44, 0xea, 0xbe, 0xf2, 0xeb, 0xc4, 0x51, 0x8b, 0x8c, 0x7b, 0x86, 0x75, 0x69, 0x56,
- 0xc7, 0x69, 0xd8, 0x2e, 0x5c, 0x22, 0x6c, 0xff, 0xa3, 0x04, 0x9b, 0x33, 0xc9, 0xcc, 0x25, 0xd7,
- 0x99, 0x4b, 0x4e, 0x0a, 0x0b, 0xba, 0xdf, 0x5f, 0x00, 0xb2, 0x75, 0x3f, 0xd0, 0xb2, 0x81, 0x8d,
- 0x99, 0x5d, 0x91, 0x5f, 0x1d, 0x34, 0xd9, 0xd8, 0x30, 0x0d, 0x70, 0xf3, 0x4d, 0xcd, 0xd2, 0x82,
- 0xa6, 0xa6, 0xf2, 0x01, 0xea, 0x82, 0xe5, 0xc8, 0x6b, 0xad, 0x60, 0x34, 0x09, 0x9b, 0x85, 0x8f,
- 0x0f, 0x9b, 0xc5, 0x7c, 0xd8, 0x6c, 0x24, 0xc7, 0xf1, 0xd0, 0x72, 0xc6, 0xd9, 0x57, 0xd7, 0x19,
- 0x67, 0x8d, 0x51, 0x68, 0x7f, 0x18, 0xe8, 0xc1, 0x4a, 0x41, 0xae, 0xea, 0xca, 0x28, 0xff, 0x5d,
- 0x82, 0xdb, 0x8b, 0x08, 0x93, 0xc5, 0xf9, 0xf9, 0xdc, 0x02, 0x5f, 0x02, 0xf0, 0x8d, 0x69, 0x86,
- 0x6b, 0x52, 0xd1, 0x5d, 0x5c, 0x22, 0x85, 0x2a, 0x47, 0xee, 0xba, 0x26, 0xcb, 0x2d, 0x1b, 0xd1,
- 0xcc, 0x54, 0x1e, 0x3c, 0x01, 0xe5, 0xc0, 0x38, 0x71, 0xb8, 0x03, 0x30, 0xf1, 0xc7, 0x44, 0x0f,
- 0xe8, 0x40, 0x34, 0x61, 0x25, 0x92, 0x81, 0xb0, 0x62, 0x67, 0xe2, 0x8f, 0x45, 0xf2, 0x3d, 0x0d,
- 0x03, 0x86, 0x55, 0xe6, 0x58, 0x73, 0x70, 0x81, 0xcb, 0x66, 0x26, 0xc7, 0x8e, 0x17, 0x0a, 0x11,
- 0x6e, 0x0e, 0x8e, 0x15, 0xc8, 0x35, 0x9e, 0x44, 0x75, 0x90, 0x6f, 0x46, 0x3d, 0x02, 0xa4, 0x9f,
- 0xe9, 0x96, 0xad, 0x1f, 0xdb, 0x89, 0x03, 0x67, 0x95, 0x41, 0x89, 0xcc, 0xc1, 0xf1, 0x16, 0xb4,
- 0x42, 0x76, 0xc4, 0xd3, 0xb3, 0xcd, 0x1b, 0x4e, 0x25, 0x32, 0x0b, 0xc6, 0xbb, 0x70, 0xfb, 0xd8,
- 0x76, 0x19, 0x28, 0xd6, 0xc7, 0xc0, 0x39, 0x12, 0x38, 0xfe, 0xd8, 0x97, 0x81, 0xb7, 0x8b, 0x96,
- 0xe2, 0x30, 0x23, 0xd3, 0x4d, 0x93, 0xc5, 0x51, 0xde, 0x5d, 0xaa, 0x92, 0xf8, 0x95, 0x85, 0x1c,
- 0x23, 0x6e, 0x4c, 0x0e, 0x2d, 0xc7, 0x88, 0xee, 0x26, 0xaa, 0x64, 0x06, 0x8a, 0xb1, 0xb8, 0xa2,
- 0x6c, 0xf0, 0xd1, 0xe8, 0x1e, 0x92, 0x85, 0xab, 0x48, 0x4e, 0xea, 0x87, 0xa9, 0xe5, 0x51, 0x93,
- 0xdf, 0x34, 0x48, 0x64, 0x06, 0x2a, 0x74, 0xb6, 0xab, 0x1b, 0xa7, 0xb6, 0x3b, 0xe6, 0x77, 0x0c,
- 0x25, 0x92, 0x81, 0x28, 0xbf, 0x84, 0x4f, 0x84, 0xc5, 0xbd, 0xa6, 0xc1, 0xbe, 0xee, 0x67, 0x3a,
- 0x6a, 0x3f, 0xd4, 0xb5, 0x7e, 0x9f, 0x76, 0x91, 0x66, 0x69, 0x27, 0x06, 0xdd, 0x85, 0x16, 0x77,
- 0x1b, 0x99, 0x60, 0x25, 0xad, 0xce, 0x37, 0x1b, 0x76, 0x8e, 0xd1, 0x15, 0x7c, 0xfc, 0xbb, 0x94,
- 0xa4, 0x1b, 0xaf, 0x69, 0xc0, 0xe3, 0x98, 0x3f, 0x78, 0xcf, 0xac, 0xc6, 0x9f, 0xea, 0xc6, 0xca,
- 0x43, 0x75, 0x1b, 0xaa, 0x4e, 0x8c, 0x2b, 0x5c, 0x5f, 0x0a, 0xc0, 0x7d, 0x28, 0x4d, 0xd8, 0x61,
- 0x2b, 0x2e, 0x69, 0xf1, 0x2d, 0x5a, 0x75, 0xfb, 0xc0, 0x35, 0xe9, 0x4b, 0x38, 0x54, 0xc9, 0xb0,
- 0x37, 0x1c, 0xa9, 0xfd, 0x11, 0xe1, 0x74, 0x94, 0x67, 0x50, 0x62, 0x23, 0x2c, 0x89, 0x4b, 0xc7,
- 0xd0, 0x15, 0x8c, 0xa1, 0xd9, 0x1f, 0xf4, 0xb5, 0x0c, 0x4c, 0xc2, 0xeb, 0x50, 0xec, 0xec, 0xef,
- 0xa3, 0x82, 0xf2, 0x2b, 0xb8, 0xbf, 0x64, 0xa9, 0xcb, 0x7a, 0x8f, 0x4d, 0x58, 0xe3, 0xd5, 0x6c,
- 0x14, 0xb9, 0xaa, 0x44, 0xbc, 0x29, 0x4e, 0x52, 0xe3, 0xbc, 0xa6, 0x81, 0xb8, 0x6a, 0x5f, 0x41,
- 0x2a, 0xa9, 0x92, 0x0b, 0xd9, 0x2a, 0x79, 0xde, 0xeb, 0x17, 0x17, 0x79, 0xfd, 0xff, 0x92, 0x92,
- 0x04, 0x24, 0x59, 0xf0, 0xff, 0x89, 0x07, 0x4c, 0x43, 0x6e, 0xe9, 0x12, 0x9d, 0xe0, 0xf9, 0xfd,
- 0x96, 0x17, 0xed, 0xf7, 0x9f, 0x6f, 0x40, 0x6d, 0x57, 0x67, 0x39, 0x0d, 0xdf, 0x33, 0xde, 0x11,
- 0xc7, 0x5d, 0xe2, 0x51, 0xec, 0x4e, 0x7e, 0x89, 0x0c, 0x62, 0xfe, 0xb3, 0x84, 0x75, 0xe1, 0x34,
- 0x44, 0x32, 0x70, 0x7b, 0xa1, 0x25, 0x8a, 0x3e, 0x07, 0x89, 0x91, 0xf1, 0xcf, 0xa0, 0x9a, 0x38,
- 0x1b, 0x91, 0x26, 0xde, 0x59, 0x36, 0x93, 0x9a, 0x24, 0x9d, 0xc0, 0x66, 0x27, 0x29, 0xb0, 0x90,
- 0xc8, 0x9d, 0xe5, 0x4d, 0x6e, 0x92, 0x4e, 0xc0, 0x5f, 0x41, 0x25, 0x4e, 0x21, 0xb8, 0x60, 0x6a,
- 0x0b, 0x2e, 0x90, 0xb3, 0xe9, 0x0a, 0x49, 0xd0, 0xf1, 0x63, 0x28, 0xf9, 0xd4, 0x89, 0xfa, 0x72,
- 0xb5, 0x59, 0x05, 0x67, 0xbb, 0x04, 0x1c, 0x0d, 0x77, 0xa1, 0xce, 0x7e, 0x35, 0x2f, 0x6a, 0x1a,
- 0x88, 0x36, 0x47, 0xfb, 0xe2, 0x69, 0x11, 0x1e, 0xa9, 0xf9, 0x99, 0x4e, 0xc3, 0x1f, 0x00, 0x70,
- 0x22, 0x51, 0x8a, 0x51, 0x59, 0xb6, 0xdb, 0xb8, 0x15, 0x40, 0xaa, 0x7e, 0xd2, 0x15, 0x78, 0x91,
- 0xe6, 0x1a, 0xd5, 0x25, 0x1a, 0x12, 0x96, 0x96, 0xf6, 0xbe, 0x1e, 0x41, 0x51, 0x37, 0x4e, 0x79,
- 0xa4, 0xa9, 0xcd, 0x5e, 0x15, 0xa6, 0xc5, 0x26, 0x61, 0x48, 0x4c, 0x2c, 0xef, 0x6d, 0xf7, 0x5b,
- 0x1e, 0x67, 0x2e, 0x12, 0x0b, 0xab, 0x86, 0x08, 0x47, 0xc3, 0xbb, 0x50, 0x0b, 0xd3, 0x1a, 0x46,
- 0x5c, 0x6e, 0x2c, 0x96, 0x4a, 0xa6, 0xd6, 0x21, 0xd9, 0x49, 0x6c, 0x5b, 0x7e, 0x94, 0x46, 0xf2,
- 0xf0, 0x74, 0xd1, 0xb6, 0x44, 0xaa, 0x49, 0x62, 0x64, 0xfc, 0x24, 0xce, 0xd5, 0x9a, 0x7c, 0xd6,
- 0xcd, 0x85, 0xb3, 0x72, 0xc9, 0x5a, 0x0f, 0x9a, 0x06, 0x4b, 0xfd, 0xb5, 0xc4, 0x68, 0x5a, 0x7c,
- 0xaa, 0xb2, 0xd8, 0x5e, 0xb3, 0xd5, 0x07, 0x69, 0x18, 0xb9, 0x62, 0x24, 0x21, 0x15, 0x07, 0x33,
- 0x7e, 0xa7, 0xbe, 0x94, 0x54, 0x1c, 0xdb, 0x05, 0xa9, 0xa4, 0xfe, 0x18, 0xf0, 0x8b, 0xc2, 0x28,
- 0x39, 0x8e, 0x05, 0x71, 0x95, 0x13, 0xfb, 0xd1, 0x52, 0x63, 0x8e, 0x05, 0xd2, 0x9a, 0xce, 0x24,
- 0xe3, 0x8f, 0xa1, 0x34, 0xb5, 0x9c, 0xb1, 0x8c, 0x97, 0xe8, 0x90, 0xe5, 0xa4, 0x84, 0xa3, 0x71,
- 0x74, 0xd7, 0x19, 0xcb, 0xd7, 0x96, 0xa1, 0xbb, 0x1c, 0xdd, 0x75, 0xc6, 0xf8, 0x4f, 0xe1, 0xae,
- 0xb7, 0xbc, 0xcc, 0x91, 0x37, 0x38, 0xa5, 0xe7, 0x0b, 0x29, 0xad, 0x28, 0x91, 0xc8, 0x2a, 0xe2,
- 0xf8, 0x8f, 0xe1, 0x6a, 0x72, 0x67, 0x12, 0x5f, 0x6d, 0xc8, 0xd7, 0xf9, 0x8a, 0x8f, 0x3f, 0xee,
- 0x3e, 0x64, 0x9e, 0x0e, 0xf6, 0x33, 0x37, 0xfb, 0xb3, 0xf7, 0x26, 0xf2, 0x26, 0x5f, 0xe4, 0x27,
- 0xbf, 0xd5, 0xa5, 0x0b, 0xb9, 0x98, 0x2e, 0x3b, 0x44, 0x76, 0xda, 0x5e, 0x97, 0x3f, 0x59, 0x72,
- 0x88, 0xb2, 0x6d, 0xf8, 0xec, 0x24, 0xfc, 0x0d, 0x5c, 0xb3, 0xe7, 0x5b, 0xf4, 0xb2, 0xcc, 0x69,
- 0x6d, 0x5d, 0xb6, 0xa5, 0x4f, 0x16, 0x11, 0xc1, 0x6f, 0xd2, 0xab, 0x5c, 0x5e, 0x4b, 0xc8, 0x37,
- 0x96, 0x99, 0x7a, 0xae, 0xea, 0xc8, 0x4f, 0xc4, 0xbf, 0x81, 0xeb, 0xc6, 0xa2, 0xaa, 0x44, 0xbe,
- 0xc9, 0x29, 0x3e, 0xba, 0x04, 0xc5, 0x98, 0xd3, 0xc5, 0x84, 0xf0, 0x08, 0xae, 0x7a, 0xb3, 0x2d,
- 0x07, 0xf9, 0x16, 0xa7, 0xfe, 0xf0, 0x02, 0x7b, 0x9c, 0xc1, 0x26, 0xf3, 0x04, 0xa2, 0x60, 0x41,
- 0x4f, 0xe5, 0xdb, 0x4b, 0x83, 0x05, 0x3d, 0x25, 0x1c, 0x0d, 0xff, 0x1c, 0xd0, 0x78, 0x26, 0x5d,
- 0x95, 0x3f, 0xe5, 0x53, 0x1f, 0x5c, 0x94, 0xdd, 0xe5, 0x73, 0xdb, 0xb9, 0xe9, 0xd8, 0x02, 0x79,
- 0x7c, 0x41, 0x06, 0x2c, 0xdf, 0x59, 0x62, 0xfc, 0x17, 0xa5, 0xcd, 0xe4, 0x42, 0x72, 0x58, 0x83,
- 0xcd, 0xa8, 0x2f, 0x96, 0xf8, 0x36, 0xcd, 0xe0, 0x5d, 0x35, 0xf9, 0x2e, 0x5f, 0xe8, 0xf3, 0x0b,
- 0x22, 0xc8, 0x7c, 0x1b, 0x8e, 0x6c, 0xe8, 0x8b, 0x9a, 0x73, 0xbf, 0x86, 0x8d, 0xf1, 0x82, 0x24,
- 0x53, 0x6e, 0x2f, 0x21, 0xbf, 0x30, 0x2b, 0x5d, 0x48, 0x06, 0x87, 0x70, 0x7b, 0xbc, 0x24, 0x87,
- 0x95, 0xef, 0xf1, 0x65, 0x9e, 0x5e, 0x7e, 0x99, 0x58, 0x64, 0x4b, 0xc9, 0xb2, 0x4c, 0x66, 0x1c,
- 0xe7, 0x9a, 0xb2, 0xb2, 0x24, 0xb6, 0xa7, 0x19, 0x69, 0x3a, 0x81, 0xd9, 0xed, 0x78, 0x36, 0x53,
- 0x95, 0xef, 0x2f, 0xb1, 0xdb, 0xb9, 0xbc, 0x96, 0xcc, 0x13, 0x50, 0xfe, 0xae, 0x2c, 0xbe, 0x19,
- 0xad, 0xc1, 0x7a, 0x77, 0xd0, 0xef, 0xab, 0xdd, 0x11, 0x2a, 0xe0, 0x06, 0x54, 0xc5, 0x8b, 0xba,
- 0x87, 0x8a, 0xec, 0x75, 0x78, 0xb4, 0x3b, 0xec, 0x92, 0xde, 0xae, 0x8a, 0x4a, 0xfc, 0xf3, 0x51,
- 0x32, 0xd8, 0x3b, 0xea, 0xaa, 0x04, 0x95, 0x71, 0x05, 0x4a, 0x43, 0xb5, 0xbf, 0x87, 0xd6, 0x30,
- 0x82, 0x3a, 0x7b, 0xd2, 0x88, 0xda, 0x55, 0x7b, 0x87, 0x23, 0xb4, 0xce, 0x0a, 0x0c, 0x0e, 0x51,
- 0x09, 0x19, 0x10, 0x54, 0x61, 0x8b, 0x1c, 0xa8, 0xc3, 0x61, 0xe7, 0xb5, 0x8a, 0xaa, 0xbc, 0xb2,
- 0xe8, 0xbe, 0x45, 0xc0, 0x28, 0xbc, 0xda, 0x1f, 0xfc, 0x02, 0xd5, 0x70, 0x0b, 0x6a, 0x47, 0xfd,
- 0x74, 0xa9, 0x3a, 0xbf, 0x1a, 0x3e, 0xea, 0x76, 0xd5, 0xe1, 0x10, 0x35, 0x70, 0x15, 0xca, 0x11,
- 0xa1, 0x26, 0xab, 0x54, 0xba, 0xfb, 0x83, 0xa1, 0xaa, 0x25, 0x8c, 0xb4, 0x52, 0x58, 0x77, 0xd0,
- 0x1f, 0x1e, 0x1d, 0xa8, 0x04, 0x21, 0xbc, 0x01, 0x28, 0xc6, 0xd0, 0x62, 0x42, 0x57, 0xd9, 0x82,
- 0x87, 0xbd, 0xfe, 0x6b, 0x84, 0xf9, 0xd3, 0xa0, 0xff, 0x1a, 0x5d, 0xc3, 0x0f, 0xe0, 0x1e, 0x51,
- 0xf7, 0xd4, 0xfd, 0xde, 0x3b, 0x95, 0x68, 0x47, 0xfd, 0x4e, 0xf7, 0x6d, 0x7f, 0xf0, 0x8b, 0x7d,
- 0x75, 0xef, 0xb5, 0xba, 0xa7, 0x09, 0x9e, 0x87, 0x68, 0x03, 0xcb, 0xb0, 0x71, 0xd8, 0x21, 0xa3,
- 0xde, 0xa8, 0x37, 0xe8, 0xf3, 0x91, 0x51, 0x67, 0xaf, 0x33, 0xea, 0xa0, 0xeb, 0xf8, 0x1e, 0x7c,
- 0xba, 0x68, 0x44, 0x23, 0xea, 0xf0, 0x70, 0xd0, 0x1f, 0xaa, 0x68, 0x93, 0x7f, 0x0c, 0x31, 0x18,
- 0xbc, 0x3d, 0x3a, 0x44, 0x9f, 0xe0, 0x6b, 0xd0, 0x8a, 0x9e, 0x53, 0x04, 0x99, 0x6f, 0x41, 0x30,
- 0xaf, 0x0d, 0x47, 0x9d, 0xd1, 0x10, 0xdd, 0xc0, 0xb7, 0xe0, 0x93, 0x3c, 0x2c, 0x9d, 0x70, 0x93,
- 0xb1, 0x43, 0xd4, 0x4e, 0xf7, 0x8d, 0xba, 0xa7, 0x31, 0x39, 0x0f, 0x5e, 0x69, 0xa3, 0xc1, 0x61,
- 0xaf, 0x8b, 0x6e, 0x45, 0x6a, 0x51, 0xdf, 0xa2, 0xdb, 0xf8, 0x13, 0xb8, 0xf6, 0x5a, 0x1d, 0x69,
- 0xfb, 0x9d, 0xe1, 0x28, 0xde, 0x89, 0xd6, 0xdb, 0x43, 0x9f, 0xe2, 0x36, 0xdc, 0x5e, 0x30, 0x90,
- 0x92, 0xbf, 0x83, 0x6f, 0xc2, 0x66, 0xa7, 0x3b, 0xea, 0xbd, 0x4b, 0x65, 0xaa, 0x75, 0xdf, 0x74,
- 0xfa, 0xaf, 0x55, 0x74, 0x97, 0xf1, 0xc5, 0x66, 0xf3, 0xf5, 0x86, 0x6c, 0xe5, 0x7e, 0xe7, 0x40,
- 0x1d, 0x1e, 0x76, 0xba, 0x2a, 0x6a, 0xe3, 0x1f, 0x41, 0xfb, 0x82, 0xc1, 0x94, 0xfc, 0x3d, 0x66,
- 0x1e, 0x0c, 0x6b, 0xd8, 0x7d, 0xa3, 0x1e, 0x74, 0x90, 0x12, 0x73, 0x1a, 0xbd, 0xa7, 0x88, 0xf7,
- 0x1f, 0x7d, 0xc9, 0xef, 0x48, 0xb3, 0x1f, 0x7a, 0xf2, 0x6f, 0x9c, 0x07, 0x7d, 0x15, 0x5d, 0x61,
- 0x76, 0xb4, 0xff, 0xcd, 0xf3, 0xe8, 0x03, 0xe7, 0x6f, 0xf6, 0x7b, 0xbb, 0xa8, 0xc0, 0x9f, 0x86,
- 0xa3, 0x3d, 0x54, 0x7c, 0xf4, 0xaf, 0x45, 0xa8, 0x65, 0x8a, 0x31, 0x66, 0xa3, 0x47, 0x0e, 0xcb,
- 0x19, 0xc4, 0x3d, 0xc1, 0x15, 0x7c, 0x15, 0x1a, 0x71, 0xbc, 0xcd, 0x5c, 0x40, 0x1c, 0xb2, 0xba,
- 0xc9, 0x0f, 0xa8, 0x63, 0x88, 0x5b, 0x86, 0x02, 0xe3, 0xae, 0x13, 0x06, 0x27, 0xd4, 0x09, 0x2c,
- 0x23, 0xbd, 0xe5, 0x40, 0x45, 0xbc, 0x09, 0xb8, 0x13, 0xdd, 0x4a, 0x7f, 0x97, 0x81, 0x97, 0xd8,
- 0x5a, 0xb1, 0x5f, 0xdb, 0x0d, 0xfd, 0x73, 0x54, 0x66, 0x4a, 0x17, 0xf7, 0xc5, 0x7d, 0x37, 0x20,
- 0x54, 0x37, 0xcf, 0xd1, 0x1a, 0xb3, 0xbc, 0x38, 0x61, 0xdb, 0x8d, 0x7a, 0x3c, 0x3f, 0x0f, 0xdd,
- 0x40, 0x57, 0x3f, 0x18, 0x94, 0x9a, 0x34, 0xca, 0x4f, 0xd1, 0x3a, 0xfe, 0x1c, 0x1e, 0x2c, 0x45,
- 0xfb, 0x60, 0xd0, 0xe8, 0x62, 0xa5, 0xc2, 0xb6, 0x14, 0x5f, 0xa0, 0x44, 0xb3, 0xab, 0x4c, 0x5b,
- 0x2c, 0xbd, 0x9e, 0x4e, 0x5d, 0x2f, 0xa0, 0xa6, 0xa8, 0x0a, 0xa3, 0x41, 0x60, 0xf8, 0xdc, 0x6b,
- 0xf5, 0xdd, 0xe0, 0x95, 0x1b, 0x3a, 0x26, 0xaa, 0x31, 0xc3, 0x1a, 0x66, 0x3e, 0x17, 0x4b, 0x46,
- 0xea, 0xfc, 0x76, 0x26, 0x6e, 0x8a, 0xc5, 0xd0, 0x06, 0xdb, 0xd9, 0xc8, 0x75, 0x0f, 0x74, 0xe7,
- 0x9c, 0x44, 0x75, 0xb2, 0x8f, 0x9a, 0x8c, 0x08, 0xa7, 0x3b, 0xa2, 0xde, 0xc4, 0x72, 0xf4, 0x20,
- 0xde, 0x4c, 0x8b, 0x89, 0x26, 0xd9, 0x0c, 0x13, 0x0d, 0x3f, 0xa9, 0x3d, 0x87, 0x5f, 0x5e, 0x45,
- 0xac, 0xe8, 0x13, 0x8a, 0xae, 0x32, 0xd1, 0xf6, 0xf8, 0x15, 0x92, 0x1e, 0x58, 0xc7, 0x36, 0x8d,
- 0x9c, 0x17, 0xc2, 0x8f, 0xde, 0x02, 0xa4, 0xdf, 0x47, 0xb0, 0x63, 0x93, 0xbe, 0x89, 0x2f, 0xdf,
- 0xaf, 0x41, 0x2b, 0x85, 0xfd, 0xd2, 0xd0, 0xdf, 0x3d, 0x8d, 0x14, 0x9b, 0x02, 0x3b, 0x4c, 0x97,
- 0x3e, 0x2a, 0x3c, 0xfa, 0x5e, 0x82, 0xd6, 0xe1, 0xcc, 0x47, 0x89, 0x6b, 0x50, 0x38, 0x7b, 0x82,
- 0xae, 0xf0, 0x5f, 0x36, 0x93, 0xfd, 0xee, 0xa0, 0x02, 0xff, 0x7d, 0x86, 0x8a, 0xfc, 0xf7, 0x39,
- 0x2a, 0xf1, 0xdf, 0x9f, 0xa0, 0x32, 0xff, 0x7d, 0x81, 0xd6, 0xf8, 0xef, 0xef, 0xa3, 0x75, 0xfe,
- 0xfb, 0x25, 0xaa, 0xf0, 0xdf, 0xaf, 0x22, 0x67, 0x77, 0xf6, 0xf4, 0x09, 0x82, 0xe8, 0xe1, 0x29,
- 0xaa, 0x45, 0x0f, 0x3b, 0xa8, 0x1e, 0x3d, 0x3c, 0x43, 0x8d, 0xdd, 0x87, 0xa0, 0xb8, 0xde, 0x78,
- 0x5b, 0x9f, 0xb2, 0xe4, 0x22, 0x76, 0xe9, 0x86, 0x3b, 0x99, 0xb8, 0xce, 0xb6, 0x1e, 0xff, 0x33,
- 0xe1, 0x4d, 0xf1, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x3f, 0x99, 0x3c, 0x38, 0xad, 0x30, 0x00,
- 0x00,
+var fileDescriptor_PulsarApi_976de857822f53b5 = []byte{
+ // 4429 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x5b, 0x4f, 0x8f, 0xdb, 0x48,
+ 0x76, 0x37, 0xf5, 0xa7, 0x5b, 0x7a, 0xfa, 0x57, 0xae, 0x6e, 0x7b, 0x34, 0xb6, 0xc7, 0x96, 0xe9,
+ 0x3f, 0xdb, 0xe3, 0x1d, 0x77, 0xec, 0xb6, 0xd7, 0x99, 0xf1, 0x6e, 0x80, 0x51, 0xab, 0x69, 0x5b,
+ 0x71, 0x5b, 0xd2, 0x94, 0xd4, 0x5e, 0xcc, 0x64, 0x17, 0x5c, 0x36, 0x59, 0x56, 0x13, 0x4d, 0x91,
+ 0x0a, 0x49, 0xf5, 0xb8, 0xe7, 0x90, 0xdb, 0x22, 0x97, 0x20, 0x41, 0x92, 0x63, 0x80, 0x2c, 0x72,
+ 0xc8, 0x07, 0xc8, 0x2d, 0xc0, 0x7e, 0x83, 0xfd, 0x06, 0x39, 0x05, 0x58, 0x20, 0xc7, 0x05, 0x02,
+ 0x24, 0x1f, 0x20, 0xa8, 0x62, 0xf1, 0x9f, 0x44, 0x49, 0xed, 0x99, 0x3d, 0xe4, 0x24, 0xf2, 0xd5,
+ 0xab, 0x57, 0xaf, 0x5e, 0xfd, 0xea, 0xbd, 0x57, 0xaf, 0x28, 0x68, 0x0c, 0x66, 0x96, 0xa7, 0xb9,
+ 0xed, 0xa9, 0xb9, 0x3b, 0x75, 0x1d, 0xdf, 0xc1, 0xd5, 0x29, 0x27, 0x04, 0x6f, 0xf2, 0xef, 0x73,
+ 0xb0, 0x31, 0xd4, 0x4f, 0xe8, 0x44, 0xc3, 0x18, 0x0a, 0xb6, 0x36, 0xa1, 0x4d, 0xa9, 0x95, 0xdb,
+ 0x29, 0x13, 0xfe, 0x8c, 0x6f, 0x41, 0xc5, 0xe3, 0xad, 0xaa, 0xa1, 0xf9, 0x5a, 0x33, 0xdf, 0xca,
+ 0xed, 0x54, 0x09, 0x04, 0xa4, 0x03, 0xcd, 0xd7, 0xf0, 0x43, 0x28, 0xf8, 0xe7, 0x53, 0xda, 0x2c,
+ 0xb4, 0x72, 0x3b, 0xf5, 0xbd, 0x8f, 0x77, 0x93, 0xc2, 0x77, 0x03, 0xc1, 0xbb, 0xa3, 0xf3, 0x29,
+ 0x25, 0x9c, 0x0d, 0x3f, 0x03, 0x98, 0xba, 0xce, 0x94, 0xba, 0xbe, 0x49, 0xbd, 0x66, 0xb1, 0x95,
+ 0xdf, 0xa9, 0xec, 0x5d, 0x4d, 0x77, 0x7a, 0x4d, 0xcf, 0xdf, 0x6a, 0xd6, 0x8c, 0x92, 0x04, 0xa7,
+ 0xfc, 0x5b, 0x09, 0x0a, 0x4c, 0x0c, 0x2e, 0x41, 0xa1, 0xe7, 0xd8, 0x14, 0x5d, 0xc2, 0x00, 0x1b,
+ 0x43, 0xdf, 0x35, 0xed, 0x31, 0x92, 0x18, 0xf5, 0xcf, 0x3d, 0xc7, 0x46, 0x39, 0x5c, 0x85, 0xd2,
+ 0x80, 0x89, 0x39, 0x9e, 0xbd, 0x43, 0x79, 0x46, 0x6f, 0x9f, 0xb9, 0x0e, 0x2a, 0xb0, 0xa7, 0x7d,
+ 0xc7, 0xb1, 0x50, 0x91, 0x3d, 0x75, 0x6d, 0xff, 0x73, 0xb4, 0x81, 0xcb, 0x50, 0xec, 0xda, 0xfe,
+ 0xe3, 0x67, 0x68, 0x53, 0x3c, 0x3e, 0xd9, 0x43, 0x25, 0xf1, 0xf8, 0xec, 0x29, 0x2a, 0xb3, 0xc7,
+ 0x17, 0x96, 0xa3, 0xf9, 0x08, 0xd8, 0x68, 0x07, 0xce, 0xec, 0xd8, 0xa2, 0xa8, 0xc2, 0x24, 0x1c,
+ 0x68, 0x3e, 0x45, 0x55, 0xf6, 0x34, 0x32, 0x27, 0x14, 0xd5, 0x70, 0x0d, 0xca, 0xec, 0xc9, 0xf3,
+ 0xb5, 0xc9, 0x14, 0xd5, 0x99, 0x1a, 0xe1, 0x3c, 0x50, 0x43, 0xfe, 0x1b, 0x09, 0x6a, 0x6f, 0xa8,
+ 0xe7, 0x69, 0x63, 0xda, 0x35, 0xb8, 0xd9, 0xae, 0x41, 0xc9, 0xa2, 0xc6, 0x98, 0xba, 0x5d, 0x83,
+ 0xdb, 0xbb, 0x40, 0xa2, 0x77, 0xdc, 0x84, 0x4d, 0x6a, 0xfb, 0xee, 0x79, 0xd7, 0x68, 0xe6, 0x78,
+ 0x53, 0xf8, 0x8a, 0x5b, 0x50, 0x9e, 0x6a, 0xae, 0x6f, 0xfa, 0xa6, 0x63, 0x37, 0xf3, 0x2d, 0x69,
+ 0xa7, 0xf8, 0x3c, 0xf7, 0xf0, 0x31, 0x89, 0x89, 0xf8, 0x0e, 0x54, 0x8e, 0x35, 0x5f, 0x3f, 0x51,
+ 0x4d, 0xdb, 0xa0, 0xef, 0x9b, 0x85, 0x88, 0x07, 0x38, 0xb9, 0xcb, 0xa8, 0xf2, 0x5e, 0xac, 0x1c,
+ 0x46, 0x90, 0x3f, 0xa5, 0xe7, 0x62, 0xcd, 0xd9, 0x23, 0xde, 0x86, 0xe2, 0x19, 0x6b, 0xe2, 0x83,
+ 0x97, 0x49, 0xf0, 0x22, 0x3f, 0x83, 0xea, 0x6b, 0x7a, 0x7e, 0xe8, 0xd8, 0xe3, 0x0b, 0xf5, 0x2b,
+ 0x84, 0xfd, 0x2c, 0xa8, 0x2b, 0xb6, 0xee, 0x9e, 0x4f, 0x99, 0x7a, 0xaf, 0xe9, 0xb9, 0xb7, 0xae,
+ 0x67, 0x55, 0xf4, 0xc4, 0x7b, 0x50, 0x9a, 0x50, 0x5f, 0x13, 0xb8, 0x5b, 0x05, 0x94, 0x88, 0x4f,
+ 0xfe, 0xfd, 0x06, 0x34, 0x84, 0xa1, 0xdf, 0x08, 0x1a, 0xbe, 0x03, 0xb5, 0xa9, 0xeb, 0x18, 0x33,
+ 0x9d, 0xba, 0x6a, 0x02, 0xdf, 0xd5, 0x90, 0xd8, 0x0b, 0x71, 0x4e, 0xff, 0x72, 0x46, 0x6d, 0x9d,
+ 0xaa, 0x66, 0x68, 0x77, 0x08, 0x49, 0x5d, 0x03, 0xdf, 0x86, 0xea, 0x74, 0x76, 0x6c, 0x99, 0xde,
+ 0x89, 0xea, 0x9b, 0x13, 0xca, 0x77, 0x42, 0x81, 0x54, 0x04, 0x8d, 0x2d, 0xfd, 0x1c, 0xb6, 0x0b,
+ 0x17, 0xc5, 0x36, 0xfe, 0x11, 0x34, 0x5c, 0x3a, 0xb5, 0x4c, 0x5d, 0xf3, 0xa9, 0xa1, 0xbe, 0x73,
+ 0x9d, 0x49, 0xb3, 0xd8, 0x92, 0x76, 0xca, 0xa4, 0x1e, 0x93, 0x5f, 0xb8, 0xce, 0x84, 0xcf, 0x24,
+ 0x5c, 0x69, 0x95, 0xd9, 0x70, 0x83, 0xb3, 0x55, 0x23, 0xe2, 0x6b, 0x7a, 0xce, 0x14, 0x8d, 0xba,
+ 0xa9, 0xbe, 0xd3, 0xdc, 0x6c, 0xe5, 0x77, 0xca, 0xa4, 0x12, 0xd1, 0x46, 0x0e, 0x56, 0xa0, 0xa2,
+ 0x3b, 0x93, 0xa9, 0x4b, 0x3d, 0x8f, 0x01, 0xa9, 0xd4, 0x92, 0x76, 0xea, 0x7b, 0x9f, 0xa4, 0x35,
+ 0xed, 0xc4, 0x0c, 0x6c, 0xdf, 0x3d, 0x2f, 0xf4, 0xfa, 0x3d, 0x85, 0x24, 0xfb, 0xe1, 0x5d, 0xb8,
+ 0x3c, 0xb3, 0x43, 0x02, 0x35, 0x54, 0xcf, 0xfc, 0x8e, 0x36, 0xcb, 0x2d, 0x69, 0xa7, 0xf6, 0x5c,
+ 0x7a, 0x44, 0x50, 0xb2, 0x6d, 0x68, 0x7e, 0x47, 0xf1, 0x53, 0xb8, 0x62, 0xcf, 0x26, 0xea, 0x24,
+ 0x58, 0x1f, 0x4f, 0x35, 0x6d, 0x95, 0x83, 0xb2, 0x59, 0xe1, 0x28, 0x95, 0x1e, 0x13, 0x6c, 0xcf,
+ 0x26, 0x62, 0xf9, 0xbc, 0xae, 0xbd, 0xcf, 0x1a, 0x71, 0x0b, 0x80, 0x9e, 0x51, 0xdb, 0x0f, 0xcc,
+ 0x5e, 0x6d, 0x49, 0x3b, 0x05, 0x26, 0xbe, 0xcc, 0x89, 0xdc, 0xee, 0x0a, 0x34, 0x68, 0x04, 0x31,
+ 0x66, 0x17, 0xaf, 0x59, 0xe3, 0xc6, 0xbf, 0x91, 0x9e, 0x52, 0x1a, 0x87, 0xa4, 0x4e, 0xd3, 0xb8,
+ 0xfc, 0x51, 0x4a, 0x8c, 0x66, 0x8d, 0x9d, 0x66, 0x3d, 0x58, 0x86, 0x98, 0xdc, 0xb6, 0xc6, 0x0e,
+ 0xfe, 0x14, 0x50, 0x82, 0x71, 0xaa, 0xb9, 0xda, 0xa4, 0xd9, 0x68, 0x49, 0x3b, 0x55, 0x92, 0x10,
+ 0x30, 0x60, 0x64, 0x7c, 0x0f, 0xea, 0xc2, 0x7d, 0x9e, 0x51, 0x97, 0x1b, 0x1b, 0x71, 0xc6, 0x5a,
+ 0x40, 0x7d, 0x1b, 0x10, 0xf1, 0x97, 0xf0, 0x71, 0x6a, 0x61, 0xd5, 0xe3, 0x67, 0x4f, 0x55, 0x6a,
+ 0xeb, 0x8e, 0x41, 0x8d, 0xe6, 0xe5, 0x96, 0xb4, 0x53, 0x7a, 0x5e, 0x7c, 0xa7, 0x59, 0x1e, 0x25,
+ 0x57, 0x93, 0x6b, 0xbd, 0xff, 0xec, 0xa9, 0x12, 0x30, 0xb1, 0x55, 0x77, 0x5c, 0x83, 0x32, 0x77,
+ 0xc8, 0x91, 0x81, 0xf9, 0x30, 0x95, 0x90, 0xc6, 0x80, 0x71, 0x1f, 0x1a, 0x06, 0xb5, 0xcc, 0x33,
+ 0xea, 0xaa, 0x9a, 0xb0, 0xe6, 0x56, 0x4b, 0xda, 0xc9, 0x93, 0x9a, 0x20, 0xb7, 0x03, 0x73, 0xde,
+ 0x82, 0xca, 0x44, 0x73, 0x4f, 0xa9, 0xab, 0x72, 0xc7, 0xbe, 0xcd, 0x16, 0x87, 0x40, 0x40, 0x62,
+ 0x50, 0x90, 0x7f, 0x97, 0x83, 0x2b, 0x43, 0xd3, 0x1e, 0x5b, 0x74, 0x7e, 0xab, 0xa5, 0x77, 0x80,
+ 0x74, 0xe1, 0x1d, 0xb0, 0x00, 0xec, 0x5c, 0x36, 0xb0, 0xa7, 0xda, 0xb9, 0xe5, 0x68, 0x02, 0x69,
+ 0x6c, 0x07, 0x16, 0x49, 0x45, 0xd0, 0x38, 0xc2, 0x1e, 0x40, 0x8d, 0x61, 0x4e, 0xd3, 0xd9, 0x46,
+ 0x72, 0x66, 0x3e, 0xf7, 0x7f, 0x91, 0xed, 0xaa, 0x51, 0x5b, 0x7f, 0xe6, 0xcf, 0xe1, 0xaa, 0x98,
+ 0x81, 0xab, 0x95, 0xab, 0xb2, 0xf1, 0x7d, 0x56, 0x65, 0x73, 0x61, 0x55, 0xe4, 0x7f, 0xcd, 0x43,
+ 0xbd, 0xe3, 0x4c, 0x26, 0x9a, 0x6d, 0x74, 0x1c, 0xdb, 0xa6, 0xba, 0xcf, 0x40, 0xa3, 0x5b, 0x26,
+ 0x53, 0x2d, 0x04, 0x4d, 0xe0, 0xb1, 0x6a, 0x01, 0x35, 0x04, 0xcd, 0x17, 0x50, 0xd1, 0x66, 0xfe,
+ 0x89, 0x3a, 0xa1, 0xfe, 0x89, 0x63, 0x70, 0x93, 0xd5, 0xf7, 0x9a, 0x69, 0x6b, 0xb7, 0x67, 0xfe,
+ 0xc9, 0x1b, 0xde, 0x4e, 0x40, 0x8b, 0x9e, 0xf1, 0x0e, 0xa0, 0x44, 0xd7, 0xc0, 0x2b, 0x0a, 0x97,
+ 0x13, 0x73, 0x71, 0xbf, 0x78, 0x1d, 0xca, 0x9c, 0x53, 0x78, 0x61, 0xa6, 0x7e, 0x89, 0x11, 0x78,
+ 0x10, 0xfb, 0x0c, 0x10, 0x1f, 0x46, 0x77, 0xac, 0x48, 0xd5, 0x20, 0xe2, 0x48, 0x8f, 0x48, 0x23,
+ 0x6c, 0x0a, 0xf5, 0x7d, 0x08, 0x5b, 0x53, 0xd7, 0x79, 0x7f, 0xae, 0xfa, 0x8e, 0x7a, 0xec, 0x3a,
+ 0x0c, 0x60, 0x33, 0xd7, 0x12, 0x3e, 0x0c, 0xf1, 0xa6, 0x91, 0xb3, 0xcf, 0x1b, 0x8e, 0x5c, 0x0b,
+ 0x3f, 0x04, 0xec, 0xb8, 0xe6, 0xd8, 0xb4, 0x35, 0x4b, 0x9d, 0xba, 0xa6, 0xad, 0x9b, 0x53, 0xcd,
+ 0xe2, 0x16, 0x2c, 0x93, 0xcb, 0x61, 0xcb, 0x20, 0x6c, 0xc0, 0x9f, 0x25, 0xd8, 0x63, 0x8d, 0x4b,
+ 0x81, 0xf0, 0xb0, 0xa5, 0x1d, 0x6a, 0xfe, 0x08, 0xb6, 0xd3, 0xdc, 0xc2, 0x88, 0x65, 0xce, 0x8f,
+ 0x93, 0xfc, 0x81, 0x31, 0xe4, 0xbf, 0x97, 0x00, 0xa5, 0xd7, 0x89, 0x1a, 0x7c, 0x7b, 0x53, 0x97,
+ 0xed, 0xa8, 0xb9, 0x95, 0x0a, 0xa8, 0xe1, 0xcc, 0xb3, 0xec, 0x94, 0x5b, 0x6a, 0xa7, 0x1d, 0x40,
+ 0x13, 0xed, 0x7d, 0xe8, 0x26, 0x43, 0xac, 0xb3, 0x4d, 0x58, 0x9f, 0x68, 0xef, 0xc5, 0x96, 0x63,
+ 0x70, 0x97, 0xff, 0x59, 0x82, 0x2d, 0xa1, 0x13, 0xd3, 0x94, 0x50, 0x6f, 0xea, 0xd8, 0x1e, 0xcd,
+ 0x04, 0x90, 0xb4, 0x08, 0xa0, 0x3d, 0x28, 0xb9, 0xa2, 0x0b, 0x57, 0x67, 0x61, 0xaf, 0x86, 0xe6,
+ 0x22, 0x11, 0x5f, 0xe6, 0x54, 0xf2, 0xcb, 0xa6, 0x22, 0xff, 0x8b, 0x04, 0xdb, 0x09, 0x05, 0x3b,
+ 0x27, 0x9a, 0x65, 0x51, 0x7b, 0x4c, 0x33, 0x0d, 0x27, 0x2d, 0x1a, 0xee, 0x29, 0x94, 0xf5, 0xb0,
+ 0xcf, 0x1a, 0x15, 0x63, 0xc6, 0x0f, 0xd4, 0xf1, 0x2b, 0x28, 0x45, 0xb0, 0xc8, 0xda, 0x17, 0xd2,
+ 0xfa, 0x7d, 0x91, 0x4b, 0xef, 0x0b, 0xf9, 0x37, 0x1b, 0x11, 0x56, 0x86, 0xb3, 0x63, 0x4f, 0x77,
+ 0xcd, 0x63, 0xca, 0x92, 0x1c, 0xdf, 0x99, 0x9a, 0xba, 0x80, 0x48, 0xf0, 0x82, 0x65, 0xa8, 0x7a,
+ 0x01, 0x0b, 0x8f, 0x1a, 0x22, 0xe7, 0x4a, 0xd1, 0xf0, 0x97, 0xb0, 0xe9, 0xcd, 0x8e, 0x99, 0xeb,
+ 0xe5, 0x3e, 0xaf, 0xbe, 0x77, 0x7f, 0x21, 0x54, 0xa7, 0x86, 0xda, 0x1d, 0x06, 0xdc, 0x24, 0xec,
+ 0xc6, 0x5c, 0xba, 0xee, 0xd8, 0xde, 0x6c, 0x42, 0x5d, 0x96, 0xdd, 0x14, 0x82, 0xec, 0x26, 0x24,
+ 0x75, 0x0d, 0xfc, 0x09, 0x80, 0xcb, 0x72, 0x1d, 0xcf, 0x67, 0xed, 0x45, 0xde, 0x5e, 0x16, 0x94,
+ 0xae, 0xc1, 0xfc, 0x73, 0xd4, 0x9f, 0x1b, 0x45, 0x24, 0x1e, 0x21, 0x91, 0x9b, 0xe4, 0x1e, 0xd4,
+ 0xa7, 0xae, 0xe9, 0xb8, 0xa6, 0x7f, 0xae, 0x5a, 0xf4, 0x8c, 0x06, 0x9b, 0xb5, 0x48, 0x6a, 0x21,
+ 0xf5, 0x90, 0x11, 0xf1, 0x4d, 0xd8, 0x34, 0x66, 0xae, 0x76, 0x6c, 0x51, 0xbe, 0x3b, 0x4b, 0xcf,
+ 0x0b, 0xbe, 0x3b, 0xa3, 0x24, 0x24, 0x62, 0x05, 0x90, 0xe7, 0x6b, 0xae, 0x1f, 0x6d, 0x00, 0x33,
+ 0xd8, 0x96, 0x95, 0xbd, 0xeb, 0xe9, 0x69, 0xa7, 0x12, 0x6a, 0x52, 0xe7, 0x9d, 0x22, 0x5a, 0x2a,
+ 0x7b, 0x84, 0x8b, 0x65, 0x8f, 0x6c, 0x06, 0x2e, 0xd5, 0x0c, 0x35, 0x8a, 0x13, 0x3c, 0x33, 0x29,
+ 0x91, 0x1a, 0xa3, 0x76, 0x42, 0x22, 0xfe, 0x0c, 0x36, 0x82, 0xf0, 0xcd, 0xb3, 0x91, 0xca, 0xde,
+ 0x76, 0xd6, 0xa1, 0x87, 0x08, 0x1e, 0xfc, 0x2b, 0x68, 0x98, 0xb6, 0xe9, 0x9b, 0x9a, 0x35, 0x70,
+ 0xbc, 0x20, 0x73, 0xaf, 0x71, 0x57, 0xbd, 0xbb, 0x66, 0x15, 0xbb, 0xe9, 0x5e, 0xcf, 0x37, 0x0e,
+ 0x35, 0x9f, 0x7a, 0x3e, 0x99, 0x17, 0x87, 0xbf, 0x84, 0x1b, 0x71, 0xc6, 0x97, 0x44, 0x8e, 0xea,
+ 0xf9, 0x9a, 0x4f, 0x79, 0x16, 0x53, 0x22, 0xd7, 0x22, 0x9e, 0x61, 0x82, 0x65, 0xc8, 0x38, 0xe4,
+ 0x7d, 0xd8, 0x14, 0x98, 0x61, 0xe7, 0x18, 0xe5, 0xbd, 0x6e, 0xcd, 0x3c, 0xf3, 0x2c, 0x3c, 0x64,
+ 0x9d, 0x68, 0x2e, 0x35, 0x90, 0xc4, 0xce, 0x34, 0x2f, 0x34, 0xd3, 0x72, 0xce, 0xa8, 0x8b, 0x72,
+ 0xb8, 0x0e, 0xf0, 0x9a, 0x9e, 0xab, 0xa2, 0x35, 0x2f, 0xff, 0x18, 0x1a, 0x73, 0x1a, 0xb3, 0xce,
+ 0x81, 0xce, 0xe8, 0x12, 0xeb, 0xac, 0x68, 0xae, 0x65, 0xb2, 0x37, 0x49, 0xfe, 0x2f, 0x09, 0x6e,
+ 0x89, 0x09, 0x0f, 0xc2, 0xd0, 0x49, 0x8d, 0x11, 0xdb, 0x12, 0x51, 0x32, 0x91, 0xbd, 0x61, 0xd2,
+ 0x48, 0xcd, 0xcd, 0x23, 0x35, 0x3b, 0x6a, 0xe4, 0x3f, 0x2c, 0x6a, 0x14, 0x3e, 0x30, 0x6a, 0x14,
+ 0x97, 0x46, 0x8d, 0x7f, 0xcf, 0xc1, 0x8f, 0xd6, 0xcc, 0x33, 0xf2, 0xda, 0x37, 0x01, 0xa2, 0x34,
+ 0xc2, 0xe3, 0x6e, 0xa7, 0x46, 0x12, 0x94, 0x75, 0x33, 0xff, 0x45, 0xc2, 0x9b, 0xe7, 0x39, 0xc0,
+ 0xbe, 0xcc, 0x04, 0xd8, 0x3a, 0x3d, 0x76, 0x0f, 0x1d, 0xe7, 0x74, 0x36, 0xe5, 0x0e, 0x24, 0xf6,
+ 0xfb, 0x7f, 0x02, 0x45, 0xea, 0xba, 0x8e, 0xcb, 0x6d, 0xb3, 0x78, 0xce, 0xe7, 0x5e, 0x5b, 0x61,
+ 0x0c, 0x24, 0xe0, 0x63, 0x87, 0x58, 0xb1, 0x81, 0x85, 0x79, 0xc2, 0x57, 0xf9, 0x1e, 0x40, 0x3c,
+ 0x04, 0xae, 0x30, 0xe8, 0xe9, 0x3a, 0xf5, 0xbc, 0x00, 0x6d, 0x0c, 0x61, 0x0c, 0x6d, 0xf2, 0xaf,
+ 0x73, 0x80, 0x85, 0xca, 0x82, 0x9d, 0xaf, 0xff, 0xf7, 0x42, 0xc5, 0x8f, 0xa1, 0xc6, 0xd6, 0x8b,
+ 0x79, 0x21, 0xcd, 0x37, 0xcf, 0x02, 0x03, 0x45, 0xd9, 0x5b, 0xba, 0x6d, 0x09, 0x84, 0x0a, 0x1f,
+ 0x06, 0xa1, 0xe2, 0x07, 0x42, 0x68, 0x63, 0x29, 0x84, 0xfe, 0x23, 0x0f, 0xd7, 0x16, 0xed, 0x10,
+ 0xa1, 0xe6, 0x01, 0xa0, 0x20, 0x99, 0x62, 0x6b, 0x60, 0xea, 0xf4, 0xc8, 0xb5, 0x44, 0xc8, 0x5a,
+ 0xa0, 0xe3, 0x47, 0xb0, 0x35, 0x4f, 0x1b, 0x59, 0x9e, 0x48, 0xb6, 0xb3, 0x9a, 0x70, 0x7f, 0x01,
+ 0x54, 0x4f, 0x32, 0x41, 0x95, 0xa1, 0x59, 0x36, 0x8e, 0xd2, 0x0b, 0x55, 0x58, 0xbb, 0x50, 0xc5,
+ 0x15, 0x0b, 0x15, 0x61, 0x72, 0xe3, 0xc3, 0x31, 0xb9, 0x99, 0xc2, 0x24, 0x4f, 0xf5, 0x83, 0xdc,
+ 0xf4, 0xc4, 0x75, 0x66, 0xe3, 0x13, 0xd5, 0x0b, 0xcc, 0xc0, 0x33, 0xd4, 0x52, 0x3a, 0xd5, 0xe7,
+ 0x89, 0x6a, 0xc0, 0x16, 0x1b, 0x4b, 0x7e, 0x92, 0x42, 0x75, 0x15, 0x4a, 0x84, 0x1a, 0xa6, 0x4b,
+ 0x75, 0xe6, 0xfb, 0x2a, 0xb0, 0x29, 0x72, 0x46, 0x24, 0x25, 0x30, 0x9e, 0x93, 0xff, 0x31, 0x07,
+ 0x8d, 0x70, 0x5b, 0x8a, 0x6a, 0xc4, 0x12, 0x80, 0xdf, 0x82, 0x4a, 0x54, 0xc4, 0x88, 0xeb, 0x13,
+ 0x21, 0x69, 0x21, 0x82, 0xe7, 0x33, 0x22, 0x78, 0xba, 0x08, 0x52, 0x10, 0x27, 0xac, 0x64, 0x11,
+ 0xe4, 0x0e, 0x94, 0xc5, 0x01, 0x96, 0x1a, 0x69, 0xcb, 0xc7, 0xf4, 0x54, 0x60, 0xdd, 0xb8, 0x60,
+ 0x60, 0x8d, 0x23, 0xe6, 0xe6, 0xfa, 0x88, 0x29, 0xcf, 0xa0, 0x12, 0x06, 0x43, 0x6a, 0x1b, 0xf3,
+ 0x53, 0x97, 0x16, 0xa6, 0xbe, 0xb6, 0x76, 0x73, 0x17, 0xaa, 0xc9, 0xc2, 0x43, 0x98, 0x0c, 0x3e,
+ 0x26, 0x95, 0x44, 0xbd, 0x41, 0xfe, 0x07, 0x29, 0x72, 0x38, 0x6c, 0x5c, 0x42, 0x75, 0x6a, 0x4e,
+ 0xfd, 0x3f, 0xc2, 0xf0, 0xcf, 0x01, 0x12, 0xb9, 0x4c, 0x7e, 0x7d, 0x2e, 0x53, 0x9e, 0x84, 0xaf,
+ 0xf2, 0x6f, 0xe2, 0x63, 0x07, 0x53, 0x8a, 0xc3, 0xf9, 0x8f, 0xa0, 0x52, 0xb4, 0x75, 0xf2, 0x99,
+ 0x65, 0xdb, 0x95, 0x5b, 0xa7, 0xc0, 0x71, 0x19, 0xb9, 0xf3, 0x7f, 0x92, 0xa2, 0x03, 0xac, 0x98,
+ 0xc5, 0x7c, 0xba, 0x29, 0x2d, 0xa4, 0x9b, 0x69, 0x8b, 0x30, 0xf5, 0x2e, 0x6c, 0x11, 0x96, 0xdd,
+ 0xbb, 0x54, 0x54, 0x2c, 0xce, 0x55, 0xdd, 0x99, 0xd9, 0x3e, 0xb7, 0x29, 0x2f, 0x3a, 0x35, 0xe2,
+ 0xa6, 0x0e, 0x6b, 0x91, 0xff, 0x37, 0x0f, 0x10, 0x9e, 0x40, 0xf4, 0xd3, 0xf5, 0x9a, 0xfd, 0x14,
+ 0x4a, 0x9a, 0x7e, 0x1a, 0x54, 0x3e, 0x72, 0xdc, 0x36, 0xad, 0x4c, 0x87, 0xd7, 0xd6, 0x4f, 0x77,
+ 0xdb, 0xfa, 0x69, 0x90, 0x66, 0x6b, 0xc1, 0xc3, 0xc2, 0x42, 0xe7, 0x3f, 0x60, 0x5a, 0x43, 0x40,
+ 0x67, 0x9a, 0x65, 0x1a, 0x1a, 0x4f, 0xdc, 0x92, 0xb1, 0x76, 0x67, 0xa9, 0x02, 0x6f, 0xa3, 0x0e,
+ 0xc1, 0x5a, 0x35, 0xce, 0xd2, 0x04, 0xa6, 0xd0, 0x42, 0xb5, 0xfd, 0xda, 0xc2, 0x6e, 0x8d, 0x8a,
+ 0xba, 0xa9, 0x8a, 0xfb, 0xa7, 0xb0, 0x29, 0x26, 0xc8, 0x52, 0xbd, 0xae, 0x6d, 0x98, 0x67, 0xa6,
+ 0x31, 0xd3, 0x2c, 0x74, 0x89, 0xbd, 0x77, 0x66, 0x93, 0x99, 0xc5, 0xdd, 0x30, 0x92, 0xe4, 0xbf,
+ 0x93, 0xa0, 0x31, 0xa7, 0x0b, 0xbe, 0x09, 0xd7, 0x8e, 0xe6, 0x0a, 0x80, 0x1d, 0xc7, 0x75, 0x67,
+ 0x3c, 0xeb, 0x44, 0x97, 0xf0, 0x55, 0xc0, 0x07, 0x34, 0x51, 0x4d, 0xe4, 0xbd, 0x90, 0x84, 0xb7,
+ 0x01, 0x75, 0x4e, 0xa8, 0x7e, 0xea, 0xcd, 0x26, 0x6f, 0x4c, 0x6f, 0xa2, 0xf9, 0xfa, 0x09, 0xca,
+ 0xe1, 0x8f, 0xe1, 0x0a, 0xaf, 0x06, 0x1e, 0xd0, 0x21, 0x75, 0x4d, 0xcd, 0x32, 0xbf, 0xa3, 0x41,
+ 0x87, 0x3c, 0xde, 0x82, 0xc6, 0x01, 0x0d, 0xab, 0x6e, 0x01, 0xb1, 0x20, 0x1f, 0xc3, 0xf5, 0xc8,
+ 0x4e, 0x4c, 0xc9, 0x8e, 0x58, 0xe1, 0xce, 0x89, 0x66, 0x5f, 0x04, 0xa0, 0x32, 0x94, 0x4d, 0x4f,
+ 0xd5, 0x78, 0x5f, 0x1e, 0x1f, 0x23, 0x4f, 0x58, 0x32, 0xbd, 0x40, 0xa4, 0xfc, 0x36, 0x72, 0x53,
+ 0x2f, 0x2c, 0xe7, 0xdb, 0xf5, 0x32, 0xef, 0x43, 0x5d, 0x2c, 0xf7, 0x80, 0xba, 0x13, 0xd3, 0xf7,
+ 0x38, 0xc0, 0x6a, 0x64, 0x8e, 0x2a, 0x8f, 0x22, 0x37, 0x74, 0x64, 0x7b, 0xd1, 0xf1, 0x71, 0xad,
+ 0xf8, 0xd5, 0x29, 0x90, 0xfc, 0x5b, 0x29, 0xe1, 0x55, 0xe9, 0xe9, 0x0f, 0x95, 0xf7, 0x43, 0x9c,
+ 0x1a, 0x4b, 0x82, 0xc2, 0xbe, 0xa9, 0x9a, 0x3a, 0xc3, 0x7b, 0x81, 0xe0, 0xd0, 0x1e, 0x71, 0x69,
+ 0x5d, 0xfe, 0x29, 0x34, 0x85, 0xf2, 0x84, 0x6a, 0xfa, 0x09, 0x35, 0x14, 0xdb, 0xe8, 0xbf, 0x1b,
+ 0x85, 0xa1, 0x71, 0xe5, 0x4c, 0xe4, 0xb7, 0x51, 0x11, 0xa2, 0x63, 0x39, 0x1e, 0x8d, 0x22, 0xed,
+ 0x5a, 0x37, 0xba, 0xc6, 0xa4, 0x73, 0x72, 0x43, 0x8c, 0xfd, 0xe0, 0xa5, 0xfa, 0x6b, 0x09, 0xee,
+ 0x47, 0xb3, 0x15, 0xee, 0xec, 0xc8, 0xd6, 0xf4, 0x53, 0xdb, 0xf9, 0x96, 0x5f, 0x12, 0x85, 0x8e,
+ 0xd6, 0x5b, 0x3f, 0xd4, 0xcf, 0xa0, 0x12, 0x2f, 0x13, 0x43, 0xdc, 0x5a, 0x9f, 0x04, 0xd1, 0x3a,
+ 0x79, 0xf2, 0x2f, 0x23, 0xd7, 0x2e, 0x72, 0xf4, 0x39, 0xd5, 0xa5, 0x79, 0x54, 0xc4, 0x81, 0x3e,
+ 0x77, 0x81, 0x40, 0xff, 0x6f, 0x12, 0x5c, 0x9d, 0x4b, 0x7f, 0x2e, 0x38, 0xce, 0x42, 0x3a, 0x93,
+ 0xcb, 0xb8, 0xd3, 0xf9, 0x0c, 0x90, 0xa5, 0x79, 0xbe, 0x9a, 0x0c, 0x85, 0x0c, 0xa8, 0x79, 0x7e,
+ 0x21, 0x56, 0x67, 0x6d, 0xc3, 0x38, 0x24, 0x2e, 0x96, 0xea, 0x0b, 0x19, 0xa5, 0x7a, 0xf9, 0x3d,
+ 0x54, 0x85, 0xca, 0x81, 0x9f, 0x5b, 0xa3, 0x68, 0x14, 0x68, 0x73, 0x1f, 0x1e, 0x68, 0xf3, 0xe9,
+ 0x40, 0x5b, 0x8b, 0x36, 0xf0, 0xc0, 0xb4, 0xc7, 0xc9, 0x57, 0xc7, 0x1e, 0x27, 0xc1, 0x28, 0x56,
+ 0x9f, 0x1d, 0xed, 0xd7, 0x1a, 0x72, 0x5d, 0x65, 0x48, 0xfe, 0xef, 0x02, 0xdc, 0xc8, 0x12, 0x4c,
+ 0xb2, 0x33, 0xfa, 0x85, 0x01, 0x3e, 0x07, 0xe0, 0x13, 0x53, 0x75, 0xc7, 0xa0, 0xa2, 0x48, 0xbd,
+ 0xc2, 0x0a, 0x65, 0xce, 0xdc, 0x71, 0x0c, 0x96, 0x8d, 0xd6, 0x82, 0x9e, 0xb1, 0x3d, 0x78, 0xca,
+ 0xca, 0x89, 0x61, 0xaa, 0x71, 0x13, 0x60, 0xe2, 0x8d, 0x89, 0xe6, 0xd3, 0xbe, 0x28, 0xf7, 0x4b,
+ 0x24, 0x41, 0x61, 0xc7, 0xa3, 0x89, 0x37, 0x16, 0xe9, 0xfa, 0x74, 0xe6, 0x33, 0xae, 0x22, 0xe7,
+ 0x5a, 0xa0, 0x0b, 0x5e, 0xd6, 0x33, 0xda, 0x76, 0xfc, 0x68, 0x11, 0xf0, 0xa6, 0xe8, 0x58, 0x86,
+ 0x54, 0xf1, 0x4b, 0x9c, 0x27, 0xd2, 0x05, 0xb1, 0x07, 0x80, 0xb4, 0x33, 0xcd, 0xb4, 0xb4, 0x63,
+ 0x2b, 0x72, 0xf9, 0x25, 0xee, 0xe2, 0x16, 0xe8, 0x78, 0x07, 0x1a, 0x33, 0xb6, 0xc5, 0xe3, 0xbd,
+ 0xcd, 0x8b, 0x5e, 0x05, 0x32, 0x4f, 0xc6, 0xfb, 0x70, 0xe3, 0xd8, 0x72, 0x18, 0x29, 0x5c, 0x8f,
+ 0xbe, 0x7d, 0x24, 0x78, 0xbc, 0xb1, 0xd7, 0x04, 0x5e, 0xed, 0x59, 0xc9, 0xc3, 0x40, 0xa6, 0x19,
+ 0x06, 0x8b, 0xbc, 0xbc, 0xc2, 0x55, 0x26, 0xe1, 0x2b, 0x0b, 0x52, 0x7a, 0x58, 0xde, 0x1e, 0x9a,
+ 0xb6, 0x1e, 0xdc, 0xb8, 0x95, 0xc9, 0x1c, 0x15, 0x63, 0x71, 0xed, 0x5f, 0xe3, 0xad, 0xc1, 0xdd,
+ 0x3e, 0x0b, 0x70, 0x81, 0x9d, 0x94, 0xf7, 0x53, 0xd3, 0xa5, 0x06, 0xaf, 0x3c, 0x49, 0x64, 0x8e,
+ 0x2a, 0xd6, 0x6c, 0x5f, 0xd3, 0x4f, 0x2d, 0x67, 0xcc, 0x6f, 0xce, 0x0a, 0x24, 0x41, 0x91, 0xbf,
+ 0x86, 0x8f, 0x04, 0xe2, 0x5e, 0x52, 0xff, 0x50, 0xf3, 0x12, 0x55, 0xbd, 0x1f, 0xea, 0x5a, 0x7f,
+ 0x1d, 0xd7, 0x9d, 0xe6, 0x65, 0x47, 0x80, 0xee, 0x40, 0x83, 0xbb, 0x8d, 0x44, 0x78, 0x93, 0xd6,
+ 0x67, 0xa8, 0x35, 0x2b, 0xa5, 0xe8, 0x1a, 0x3d, 0xfe, 0x53, 0x8a, 0x12, 0x94, 0x97, 0xd4, 0xe7,
+ 0x71, 0xcc, 0xeb, 0xbf, 0x63, 0xa8, 0xf1, 0xa6, 0x9a, 0xbe, 0x76, 0x53, 0xdd, 0x80, 0xb2, 0x1d,
+ 0xf2, 0x0a, 0xd7, 0x17, 0x13, 0x70, 0x0f, 0x0a, 0x13, 0xb6, 0xd9, 0xf2, 0x2b, 0xca, 0x8c, 0x59,
+ 0xa3, 0xee, 0xbe, 0x71, 0x0c, 0xfa, 0x1c, 0x06, 0x0a, 0x19, 0x76, 0x87, 0x23, 0xa5, 0x37, 0x22,
+ 0x5c, 0x8e, 0xfc, 0x04, 0x0a, 0xac, 0x85, 0xa5, 0x7d, 0x71, 0x1b, 0xba, 0x84, 0x31, 0xd4, 0x7b,
+ 0xfd, 0x9e, 0x9a, 0xa0, 0x49, 0x78, 0x13, 0xf2, 0xed, 0xc3, 0x43, 0x94, 0x93, 0x7f, 0x01, 0x77,
+ 0x56, 0x0c, 0x75, 0x51, 0xef, 0x71, 0x15, 0x36, 0xf8, 0xf9, 0x37, 0x88, 0x5c, 0x65, 0x22, 0xde,
+ 0x64, 0x3b, 0x3a, 0x15, 0xbd, 0xa4, 0xbe, 0xf8, 0x7c, 0x65, 0x8d, 0xa8, 0xe8, 0x5c, 0x9d, 0x4b,
+ 0x9e, 0xab, 0x17, 0xbd, 0x7e, 0x3e, 0xcb, 0xeb, 0xff, 0x41, 0x8a, 0x12, 0x90, 0x68, 0xc0, 0xff,
+ 0x27, 0x1e, 0x30, 0x0e, 0xb9, 0x85, 0x0b, 0x54, 0xa3, 0x17, 0xe7, 0x5b, 0xcc, 0x9a, 0xef, 0xff,
+ 0x5c, 0x83, 0xca, 0xbe, 0xc6, 0x72, 0x1a, 0x3e, 0x67, 0xbc, 0x27, 0xb6, 0xbb, 0xc4, 0xa3, 0xd8,
+ 0xcd, 0xf4, 0x10, 0x09, 0xc6, 0xf4, 0xa7, 0x3e, 0x9b, 0xc2, 0x69, 0x88, 0x64, 0xe0, 0x46, 0x26,
+ 0x12, 0x45, 0x65, 0x84, 0x84, 0xcc, 0xf8, 0x67, 0x50, 0x8e, 0x9c, 0x8d, 0x48, 0x2c, 0x6f, 0xae,
+ 0xea, 0x49, 0x0d, 0x12, 0x77, 0x60, 0xbd, 0xa3, 0xa4, 0x59, 0x58, 0xe4, 0xe6, 0xea, 0x42, 0x3b,
+ 0x89, 0x3b, 0xe0, 0x2f, 0xa0, 0x14, 0xa6, 0x10, 0xdc, 0x30, 0x95, 0x8c, 0xcf, 0x22, 0x92, 0xe9,
+ 0x0a, 0x89, 0xd8, 0xf1, 0x43, 0x28, 0x78, 0xd4, 0x0e, 0x2a, 0x79, 0x95, 0xf9, 0x05, 0x4e, 0xd6,
+ 0x15, 0x38, 0x1b, 0xee, 0x40, 0x95, 0xfd, 0xaa, 0x6e, 0x50, 0x66, 0x10, 0x85, 0x91, 0xd6, 0xf2,
+ 0x6e, 0x01, 0x1f, 0xa9, 0x78, 0x89, 0xda, 0xc4, 0x9f, 0x01, 0x70, 0x21, 0x41, 0x8a, 0x51, 0x5a,
+ 0x35, 0xdb, 0xb0, 0x78, 0x40, 0xca, 0x5e, 0x54, 0x47, 0x78, 0x16, 0xe7, 0x1a, 0xe5, 0x15, 0x2b,
+ 0x24, 0x90, 0x16, 0x57, 0xcb, 0x1e, 0x40, 0x5e, 0xd3, 0x4f, 0x79, 0xa4, 0xa9, 0xcc, 0xdf, 0x38,
+ 0xc7, 0xc7, 0x53, 0xc2, 0x98, 0x98, 0x59, 0xde, 0x59, 0xce, 0xb7, 0x3c, 0xce, 0x2c, 0x33, 0x0b,
+ 0x3b, 0x3f, 0x11, 0xce, 0x86, 0xf7, 0xa1, 0x32, 0x8b, 0x4f, 0x3d, 0xe2, 0x82, 0x25, 0xdb, 0x2a,
+ 0x89, 0xd3, 0x11, 0x49, 0x76, 0x62, 0xd3, 0xf2, 0x82, 0x34, 0x92, 0x87, 0xa7, 0x65, 0xd3, 0x12,
+ 0xa9, 0x26, 0x09, 0x99, 0xf1, 0xa3, 0x30, 0x57, 0xab, 0xf3, 0x5e, 0xd7, 0x32, 0x7b, 0xa5, 0x92,
+ 0xb5, 0x2e, 0xd4, 0x75, 0x96, 0xfa, 0xab, 0x11, 0x68, 0x1a, 0xbc, 0xab, 0x9c, 0x8d, 0xd7, 0xe4,
+ 0xe9, 0x83, 0xd4, 0xf4, 0xd4, 0x61, 0x24, 0x12, 0x15, 0x06, 0x33, 0xfe, 0xa5, 0xc8, 0x4a, 0x51,
+ 0x61, 0x6c, 0x17, 0xa2, 0xa2, 0xf3, 0x47, 0x9f, 0xdf, 0x7f, 0x06, 0xc9, 0x71, 0x68, 0x88, 0xcb,
+ 0x5c, 0xd8, 0xdd, 0x95, 0x60, 0x0e, 0x0d, 0xd2, 0x98, 0xce, 0x25, 0xe3, 0x0f, 0xa1, 0x30, 0x35,
+ 0xed, 0x31, 0xff, 0xa8, 0x64, 0xd9, 0x1a, 0xb2, 0x9c, 0x94, 0x70, 0x36, 0xce, 0xee, 0xd8, 0x63,
+ 0xfe, 0x75, 0xc9, 0x52, 0x76, 0x87, 0xb3, 0x3b, 0xf6, 0x18, 0xff, 0x15, 0xdc, 0x72, 0x57, 0x1f,
+ 0x73, 0xf8, 0x37, 0x28, 0x95, 0xbd, 0xa7, 0x99, 0x92, 0xd6, 0x1c, 0x91, 0xc8, 0x3a, 0xe1, 0xf8,
+ 0x2f, 0xe0, 0x72, 0x74, 0xcb, 0x12, 0x5e, 0x86, 0x34, 0xaf, 0xf0, 0x11, 0x1f, 0x7e, 0xd8, 0x0d,
+ 0xca, 0xa2, 0x1c, 0xec, 0x25, 0xbe, 0x21, 0x99, 0xbf, 0x69, 0x69, 0x5e, 0xe5, 0x83, 0xfc, 0xe4,
+ 0x7b, 0x5d, 0xd3, 0x90, 0xe5, 0x72, 0xd9, 0x26, 0xb2, 0xe2, 0x82, 0x7c, 0xf3, 0xa3, 0x15, 0x9b,
+ 0x28, 0x59, 0xb8, 0x4f, 0x76, 0xc2, 0xdf, 0xc0, 0x96, 0xb5, 0x58, 0xd4, 0x6f, 0x36, 0xb9, 0xac,
+ 0x9d, 0x8b, 0x5e, 0x02, 0x90, 0x2c, 0x21, 0xf8, 0x55, 0x7c, 0x9d, 0xcc, 0xcf, 0x12, 0xcd, 0x8f,
+ 0x57, 0x41, 0x3d, 0x75, 0xea, 0x48, 0x77, 0xc4, 0xbf, 0x82, 0x2b, 0x7a, 0xd6, 0xa9, 0xa4, 0x79,
+ 0x8d, 0x4b, 0x7c, 0x70, 0x01, 0x89, 0xa1, 0xa6, 0xd9, 0x82, 0xf0, 0x08, 0x2e, 0xbb, 0xf3, 0x25,
+ 0x87, 0xe6, 0x75, 0x2e, 0xfd, 0xfe, 0x12, 0x3c, 0xce, 0x71, 0x93, 0x45, 0x01, 0x41, 0xb0, 0xa0,
+ 0xa7, 0xcd, 0x1b, 0x2b, 0x83, 0x05, 0x3d, 0x25, 0x9c, 0x0d, 0x7f, 0x05, 0x68, 0x3c, 0x97, 0xae,
+ 0x36, 0x3f, 0xe1, 0x5d, 0xef, 0x2d, 0xcb, 0xee, 0xd2, 0xb9, 0xed, 0x42, 0x77, 0x6c, 0x42, 0x73,
+ 0xbc, 0x24, 0x03, 0x6e, 0xde, 0x5c, 0x01, 0xfe, 0x65, 0x69, 0x33, 0x59, 0x2a, 0x0e, 0xab, 0x70,
+ 0x35, 0xa8, 0xa4, 0x45, 0xbe, 0x4d, 0xd5, 0x79, 0x1d, 0xae, 0x79, 0x8b, 0x0f, 0xf4, 0xe9, 0x92,
+ 0x08, 0xb2, 0x58, 0xb8, 0x23, 0xdb, 0x5a, 0x56, 0x39, 0xef, 0x97, 0xb0, 0x3d, 0xce, 0x48, 0x32,
+ 0x9b, 0xad, 0x15, 0xe2, 0x33, 0xb3, 0xd2, 0x4c, 0x31, 0x78, 0x06, 0x37, 0xc6, 0x2b, 0x72, 0xd8,
+ 0xe6, 0x6d, 0x3e, 0xcc, 0xe3, 0x8b, 0x0f, 0x13, 0x9a, 0x6c, 0xa5, 0x58, 0x96, 0xc9, 0x8c, 0xc3,
+ 0x5c, 0xb3, 0x29, 0xaf, 0x88, 0xed, 0x71, 0x46, 0x1a, 0x77, 0x60, 0xb8, 0x1d, 0xcf, 0x67, 0xaa,
+ 0xcd, 0x3b, 0x2b, 0x70, 0xbb, 0x90, 0xd7, 0x92, 0x45, 0x01, 0x6c, 0xe7, 0x6a, 0xc9, 0x0f, 0x79,
+ 0x9a, 0x77, 0x57, 0xec, 0xdc, 0xd4, 0x27, 0x3f, 0x24, 0xdd, 0x11, 0x2b, 0x50, 0xd5, 0x12, 0xdf,
+ 0x2c, 0x35, 0xef, 0x71, 0x41, 0xb7, 0x97, 0x0a, 0x8a, 0xb4, 0x4a, 0x75, 0x93, 0x7f, 0x57, 0x14,
+ 0xdf, 0x85, 0x57, 0x60, 0xb3, 0xd3, 0xef, 0xf5, 0x94, 0xce, 0x08, 0xe5, 0x70, 0x0d, 0xca, 0xe2,
+ 0x45, 0x39, 0x40, 0x79, 0xf6, 0x3a, 0x3c, 0xda, 0x1f, 0x76, 0x48, 0x77, 0x5f, 0x41, 0x05, 0xfe,
+ 0x89, 0x38, 0xe9, 0x1f, 0x1c, 0x75, 0x14, 0x12, 0x7c, 0x0e, 0x3e, 0x54, 0x7a, 0x07, 0x68, 0x03,
+ 0x23, 0xa8, 0xb2, 0x27, 0x95, 0x28, 0x1d, 0xa5, 0x3b, 0x18, 0xa1, 0x4d, 0x76, 0xe2, 0xe1, 0x14,
+ 0x85, 0x90, 0x3e, 0x41, 0x25, 0x36, 0xc8, 0x1b, 0x65, 0x38, 0x6c, 0xbf, 0x54, 0x50, 0x99, 0x1f,
+ 0x75, 0x3a, 0xaf, 0x11, 0x30, 0x09, 0x2f, 0x0e, 0xfb, 0x3f, 0x47, 0x15, 0xdc, 0x80, 0xca, 0x51,
+ 0x2f, 0x1e, 0xaa, 0xca, 0x6f, 0xb7, 0x8f, 0x3a, 0x1d, 0x65, 0x38, 0x44, 0x35, 0x5c, 0x86, 0x62,
+ 0x20, 0xa8, 0xce, 0x8e, 0x4e, 0x9d, 0xc3, 0xfe, 0x50, 0x51, 0x23, 0x45, 0x1a, 0x31, 0xad, 0xd3,
+ 0xef, 0x0d, 0x8f, 0xde, 0x28, 0x04, 0x21, 0xbc, 0x0d, 0x28, 0xe4, 0x50, 0x43, 0x41, 0x97, 0xd9,
+ 0x80, 0x83, 0x6e, 0xef, 0x25, 0xc2, 0xfc, 0xa9, 0xdf, 0x7b, 0x89, 0xb6, 0xf0, 0x3d, 0xb8, 0x4d,
+ 0x94, 0x03, 0xe5, 0xb0, 0xfb, 0x56, 0x21, 0xea, 0x51, 0xaf, 0xdd, 0x79, 0xdd, 0xeb, 0xff, 0xfc,
+ 0x50, 0x39, 0x78, 0xa9, 0x1c, 0xa8, 0x42, 0xe7, 0x21, 0xda, 0xc6, 0x4d, 0xd8, 0x1e, 0xb4, 0xc9,
+ 0xa8, 0x3b, 0xea, 0xf6, 0x7b, 0xbc, 0x65, 0xd4, 0x3e, 0x68, 0x8f, 0xda, 0xe8, 0x0a, 0xbe, 0x0d,
+ 0x9f, 0x64, 0xb5, 0xa8, 0x44, 0x19, 0x0e, 0xfa, 0xbd, 0xa1, 0x82, 0xae, 0xf2, 0xef, 0x39, 0xfa,
+ 0xfd, 0xd7, 0x47, 0x03, 0xf4, 0x11, 0xde, 0x82, 0x46, 0xf0, 0x1c, 0x33, 0x34, 0xf9, 0x14, 0x84,
+ 0xf2, 0xea, 0x70, 0xd4, 0x1e, 0x0d, 0xd1, 0xc7, 0xf8, 0x3a, 0x7c, 0x94, 0xa6, 0xc5, 0x1d, 0xae,
+ 0x31, 0x75, 0x88, 0xd2, 0xee, 0xbc, 0x52, 0x0e, 0x54, 0x66, 0xe7, 0xfe, 0x0b, 0x75, 0xd4, 0x1f,
+ 0x74, 0x3b, 0xe8, 0x7a, 0xb0, 0x2c, 0xca, 0x6b, 0x74, 0x03, 0x7f, 0x04, 0x5b, 0x2f, 0x95, 0x91,
+ 0x7a, 0xd8, 0x1e, 0x8e, 0xc2, 0x99, 0xa8, 0xdd, 0x03, 0xf4, 0x09, 0x6e, 0xc1, 0x8d, 0x8c, 0x86,
+ 0x58, 0xfc, 0x4d, 0x7c, 0x0d, 0xae, 0xb6, 0x3b, 0xa3, 0xee, 0xdb, 0xd8, 0xa6, 0x6a, 0xe7, 0x55,
+ 0xbb, 0xf7, 0x52, 0x41, 0xb7, 0x98, 0x5e, 0xac, 0x37, 0x1f, 0x6f, 0xc8, 0x46, 0xee, 0xb5, 0xdf,
+ 0x28, 0xc3, 0x41, 0xbb, 0xa3, 0xa0, 0x16, 0xbe, 0x0b, 0xad, 0x25, 0x8d, 0xb1, 0xf8, 0xdb, 0x0c,
+ 0x1e, 0x8c, 0x6b, 0xd8, 0x79, 0xa5, 0xbc, 0x69, 0x23, 0x39, 0xd4, 0x34, 0x78, 0x8f, 0x19, 0xef,
+ 0x30, 0xbb, 0xb4, 0x8f, 0x46, 0xaf, 0xd8, 0xe0, 0x87, 0x87, 0x0a, 0x1b, 0xff, 0x2e, 0xbe, 0x0c,
+ 0x35, 0x4e, 0x8b, 0xd8, 0xee, 0x3d, 0x38, 0xe0, 0xb7, 0xc1, 0xc9, 0xcf, 0xae, 0xf9, 0xdf, 0x1d,
+ 0xfa, 0x3d, 0x05, 0x5d, 0x62, 0x70, 0x3b, 0xfc, 0xe6, 0x69, 0xf0, 0x5f, 0x87, 0x6f, 0x0e, 0xbb,
+ 0xfb, 0x28, 0xc7, 0x9f, 0x86, 0x23, 0x86, 0x70, 0x80, 0x8d, 0x61, 0xaf, 0x3d, 0x18, 0x7c, 0x8d,
+ 0x0a, 0x0f, 0xfe, 0x90, 0x87, 0x4a, 0xe2, 0x40, 0xc9, 0x60, 0x7d, 0x64, 0xb3, 0xbc, 0x47, 0xdc,
+ 0x8e, 0x5c, 0x62, 0x43, 0x87, 0x39, 0x43, 0xe2, 0xda, 0x65, 0xc0, 0xce, 0x7e, 0x9e, 0x4f, 0x6d,
+ 0x5d, 0xdc, 0xad, 0xe4, 0xd8, 0x84, 0xd8, 0xde, 0xa3, 0xb6, 0x6f, 0xea, 0xf1, 0xdd, 0x0e, 0xca,
+ 0xe3, 0xab, 0x80, 0xdb, 0xc1, 0x5d, 0xfc, 0x77, 0x09, 0x7a, 0x81, 0x8d, 0x15, 0xfa, 0xe6, 0xfd,
+ 0x99, 0x77, 0x8e, 0x8a, 0x0c, 0x27, 0xe2, 0x96, 0xbc, 0xe7, 0xf8, 0x84, 0x6a, 0xc6, 0x39, 0xda,
+ 0x60, 0x60, 0x0d, 0x93, 0xce, 0xfd, 0xa0, 0x4e, 0xf5, 0xd5, 0xcc, 0xf1, 0x35, 0xe5, 0xbd, 0x4e,
+ 0xa9, 0x41, 0x83, 0x1c, 0x1b, 0x6d, 0xe2, 0x4f, 0xe1, 0xde, 0x4a, 0xb6, 0xf7, 0x3a, 0x0d, 0xae,
+ 0x93, 0x4a, 0x6c, 0x4a, 0xe1, 0xb5, 0x51, 0xd0, 0xbb, 0xcc, 0x16, 0x98, 0x1d, 0x11, 0xa6, 0x53,
+ 0xc7, 0xf5, 0xa9, 0x21, 0x4e, 0xb6, 0x41, 0x23, 0x30, 0x7e, 0xee, 0x79, 0x7b, 0x8e, 0xff, 0xc2,
+ 0x99, 0xd9, 0x06, 0xaa, 0x30, 0x2c, 0x26, 0xbf, 0x8c, 0x8a, 0x5a, 0xaa, 0xfc, 0x4e, 0x2a, 0x2c,
+ 0xec, 0x85, 0xd4, 0x1a, 0x9b, 0xd9, 0xc8, 0x71, 0xde, 0x68, 0xf6, 0x39, 0x09, 0xce, 0xfa, 0x1e,
+ 0xaa, 0x33, 0x21, 0x5c, 0xee, 0x88, 0xba, 0x13, 0xd3, 0xd6, 0xfc, 0x70, 0x32, 0x0d, 0x66, 0x9a,
+ 0x68, 0x32, 0xcc, 0x34, 0x7c, 0x73, 0x77, 0x6d, 0x7e, 0x65, 0x17, 0xa8, 0xa2, 0x4d, 0x28, 0xba,
+ 0xcc, 0x4c, 0xdb, 0xe5, 0x17, 0x67, 0x9a, 0x6f, 0x1e, 0x5b, 0x34, 0x70, 0xc0, 0x08, 0xb3, 0xb5,
+ 0x08, 0x95, 0x68, 0x7b, 0x9e, 0x39, 0x16, 0x53, 0xd9, 0x7a, 0xf0, 0x1a, 0x20, 0xfe, 0x5c, 0x84,
+ 0x43, 0x2d, 0xfe, 0x54, 0x31, 0xf8, 0xa7, 0xcc, 0x16, 0x34, 0x62, 0xda, 0xd7, 0xba, 0xf6, 0xf6,
+ 0x71, 0xb0, 0xe2, 0x31, 0xb1, 0xcd, 0x16, 0xd9, 0x43, 0xb9, 0x07, 0x7f, 0x2b, 0x41, 0x63, 0x30,
+ 0xf7, 0x41, 0xea, 0x06, 0xe4, 0xce, 0x1e, 0xa1, 0x4b, 0xfc, 0x97, 0xf5, 0x64, 0xbf, 0x7b, 0x28,
+ 0xc7, 0x7f, 0x9f, 0xa0, 0x3c, 0xff, 0x7d, 0x8a, 0x0a, 0xfc, 0xf7, 0x27, 0xa8, 0xc8, 0x7f, 0x9f,
+ 0xa1, 0x0d, 0xfe, 0xfb, 0xa7, 0x68, 0x93, 0xff, 0x7e, 0x8e, 0x4a, 0xfc, 0xf7, 0x8b, 0xc0, 0x71,
+ 0x9e, 0x3d, 0x7e, 0x84, 0x20, 0x78, 0x78, 0x8c, 0x2a, 0xc1, 0xc3, 0x1e, 0xaa, 0x06, 0x0f, 0x4f,
+ 0x50, 0x2d, 0x78, 0x78, 0x8a, 0xea, 0xfb, 0xf7, 0x41, 0x76, 0xdc, 0xf1, 0xae, 0x36, 0x65, 0x29,
+ 0x54, 0x18, 0x1d, 0x74, 0x67, 0x32, 0x71, 0xec, 0x5d, 0x2d, 0xfc, 0x4f, 0xd3, 0xab, 0xfc, 0xff,
+ 0x05, 0x00, 0x00, 0xff, 0xff, 0x47, 0xe6, 0x3b, 0xba, 0xe7, 0x34, 0x00, 0x00,
}
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index d45253d..de190e0 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -42,6 +42,10 @@
// Failover subscription mode, multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages.
// If that consumer disconnects, one of the other connected consumers will start receiving messages.
Failover
+
+ // KeyShared subscription mode, multiple consumer will be able to use the same subscription and all messages with the same key
+ // will be dispatched to only one consumer
+ KeyShared
)
type InitialPosition int
@@ -135,6 +139,9 @@
// This calls blocks until a message is available.
Receive(context.Context) (Message, error)
+ // ReceiveAsync appends the message to the msgs channel asynchronously.
+ ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error
+
// Ack the consumption of a single message
Ack(Message) error
@@ -170,5 +177,5 @@
// active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
// the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
// breaks, the messages are redelivered after reconnect.
- RedeliverUnackedMessages()
+ RedeliverUnackedMessages() error
}
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
new file mode 100644
index 0000000..8eeb784
--- /dev/null
+++ b/pulsar/consumer_test.go
@@ -0,0 +1,341 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+ "context"
+ "fmt"
+ "github.com/stretchr/testify/assert"
+ "log"
+ "net/http"
+ "strings"
+ "testing"
+ "time"
+)
+
+var (
+ adminURL = "http://localhost:8080"
+ lookupURL = "pulsar://localhost:6650"
+)
+
+func TestProducerConsumer(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := "my-topic"
+ ctx := context.Background()
+
+ // create consumer
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ Type: Shared,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // send 10 messages
+ for i := 0; i < 10; i++ {
+ if err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ }); err != nil {
+ log.Fatal(err)
+ }
+ }
+
+ // receive 10 messages
+ for i := 0; i < 10; i++ {
+ msg, err := consumer.Receive(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ expectMsg := fmt.Sprintf("hello-%d", i)
+ assert.Equal(t, []byte(expectMsg), msg.Payload())
+
+ // ack message
+ if err := consumer.Ack(msg); err != nil {
+ log.Fatal(err)
+ }
+ }
+
+ // unsubscribe consumer
+ if err := consumer.Unsubscribe(); err != nil {
+ log.Fatal(err)
+ }
+}
+
+func TestConsumerConnectError(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://invalid-hostname:6650",
+ })
+
+ assert.Nil(t, err)
+
+ defer client.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: "my-topic",
+ SubscriptionName: "my-subscription",
+ })
+
+ // Expect error in creating consumer
+ assert.Nil(t, consumer)
+ assert.NotNil(t, err)
+
+ assert.Equal(t, err.Error(), "connection error")
+}
+
+func TestConsumerWithInvalidConf(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ if err != nil {
+ t.Fatal(err)
+ return
+ }
+
+ defer client.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: "my-topic",
+ })
+
+ // Expect error in creating cosnumer
+ assert.Nil(t, consumer)
+ assert.NotNil(t, err)
+
+ fmt.Println(err.Error())
+ assert.Equal(t, err.(*Error).Result(), SubscriptionNotFound)
+
+ consumer, err = client.Subscribe(ConsumerOptions{
+ SubscriptionName: "my-subscription",
+ })
+
+ // Expect error in creating consumer
+ assert.Nil(t, consumer)
+ assert.NotNil(t, err)
+
+ assert.Equal(t, err.(*Error).Result(), TopicNotFound)
+}
+
+func TestConsumer_SubscriptionEarliestPos(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := fmt.Sprintf("testSeek-%d", time.Now().Unix())
+ subName := "test-subscription-initial-earliest-position"
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ //sent message
+ ctx := context.Background()
+
+ err = producer.Send(ctx, &ProducerMessage{
+ Payload: []byte("msg-1-content-1"),
+ })
+ assert.Nil(t, err)
+
+ err = producer.Send(ctx, &ProducerMessage{
+ Payload: []byte("msg-1-content-2"),
+ })
+ assert.Nil(t, err)
+
+ // create consumer
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: subName,
+ SubscriptionInitPos: Earliest,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+
+ assert.Equal(t, "msg-1-content-1", string(msg.Payload()))
+}
+
+func makeHTTPCall(t *testing.T, method string, urls string, body string) {
+ client := http.Client{}
+
+ req, err := http.NewRequest(method, urls, strings.NewReader(body))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ req.Header.Set("Content-Type", "application/json")
+ req.Header.Set("Accept", "application/json")
+
+ res, err := client.Do(req)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer res.Body.Close()
+}
+
+func TestConsumerShared(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := "persistent://public/default/test-topic-6"
+
+ consumer1, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "sub-1",
+ Type: KeyShared,
+ })
+ assert.Nil(t, err)
+ defer consumer1.Close()
+
+ consumer2, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "sub-1",
+ Type: KeyShared,
+ })
+ assert.Nil(t, err)
+ defer consumer2.Close()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ ctx := context.Background()
+ for i := 0; i < 10; i++ {
+ err := producer.Send(ctx, &ProducerMessage{
+ Key: fmt.Sprintf("key-shared-%d", i%3),
+ Payload: []byte(fmt.Sprintf("value-%d", i)),
+ })
+ assert.Nil(t, err)
+ }
+
+ time.Sleep(time.Second * 5)
+
+ go func() {
+ for i := 0; i < 10; i++ {
+ msg, err := consumer1.Receive(ctx)
+ assert.Nil(t, err)
+ if msg != nil {
+ fmt.Printf("consumer1 key is: %s, value is: %s\n", msg.Key(), string(msg.Payload()))
+ err = consumer1.Ack(msg)
+ assert.Nil(t, err)
+ }
+ }
+ }()
+
+ go func() {
+ for i := 0; i < 10; i++ {
+ msg2, err := consumer2.Receive(ctx)
+ assert.Nil(t, err)
+ if msg2 != nil {
+ fmt.Printf("consumer2 key is:%s, value is: %s\n", msg2.Key(), string(msg2.Payload()))
+ err = consumer2.Ack(msg2)
+ assert.Nil(t, err)
+ }
+ }
+ }()
+}
+
+func TestPartitionTopicsConsumerPubSub(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := "persistent://public/default/testGetPartitions"
+ testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions/partitions"
+
+ makeHTTPCall(t, http.MethodPut, testURL, "3")
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ topics, err := client.TopicPartitions(topic)
+ assert.Nil(t, err)
+ assert.Equal(t, topic+"-partition-0", topics[0])
+ assert.Equal(t, topic+"-partition-1", topics[1])
+ assert.Equal(t, topic+"-partition-2", topics[2])
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ Type: Exclusive,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ ctx := context.Background()
+ for i := 0; i < 10; i++ {
+ err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ })
+ assert.Nil(t, err)
+ }
+
+ msgs := make([]string, 0)
+
+ for i := 0; i < 10; i++ {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ msgs = append(msgs, string(msg.Payload()))
+
+ fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
+ msg.ID(), string(msg.Payload()))
+
+ if err := consumer.Ack(msg); err != nil {
+ assert.Nil(t, err)
+ }
+ }
+
+ assert.Equal(t, len(msgs), 10)
+}
diff --git a/pulsar/error.go b/pulsar/error.go
index 8b73a95..0231913 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -59,8 +59,8 @@
//ProducerBlockedQuotaExceededException Result = 25 // Producer is getting exception
//ProducerQueueIsFull Result = 26 // Producer queue is full
//MessageTooBig Result = 27 // Trying to send a messages exceeding the max size
- //TopicNotFound Result = 28 // Topic not found
- //SubscriptionNotFound Result = 29 // Subscription not found
+ TopicNotFound Result = 28 // Topic not found
+ SubscriptionNotFound Result = 29 // Subscription not found
//ConsumerNotFound Result = 30 // Consumer not found
//UnsupportedVersionError Result = 31 // Error when an older client/version doesn't support a required feature
//TopicTerminated Result = 32 // Topic was already terminated
diff --git a/pulsar/impl_client.go b/pulsar/impl_client.go
index b73080b..aeda1c0 100644
--- a/pulsar/impl_client.go
+++ b/pulsar/impl_client.go
@@ -96,8 +96,12 @@
}
func (client *client) Subscribe(options ConsumerOptions) (Consumer, error) {
- // TODO: Implement consumer
- return nil, nil
+ consumer, err := newConsumer(client, &options)
+ if err != nil {
+ return nil, err
+ }
+ client.handlers[consumer] = true
+ return consumer, nil
}
func (client *client) CreateReader(options ReaderOptions) (Reader, error) {
diff --git a/pulsar/impl_consumer.go b/pulsar/impl_consumer.go
new file mode 100644
index 0000000..0a44971
--- /dev/null
+++ b/pulsar/impl_consumer.go
@@ -0,0 +1,256 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+ "context"
+ "errors"
+ "fmt"
+
+ "github.com/apache/pulsar-client-go/pkg/pb"
+ "github.com/apache/pulsar-client-go/util"
+ "github.com/golang/protobuf/proto"
+
+ log "github.com/sirupsen/logrus"
+)
+
+type consumer struct {
+ topicName string
+ consumers []Consumer
+ log *log.Entry
+ queue chan ConsumerMessage
+ unackMsgTracker *UnackedMessageTracker
+}
+
+func newConsumer(client *client, options *ConsumerOptions) (*consumer, error) {
+ if options == nil {
+ return nil, newError(ResultInvalidConfiguration, "consumer configuration undefined")
+ }
+
+ if options.Topic == "" && options.Topics == nil && options.TopicsPattern == "" {
+ return nil, newError(TopicNotFound, "topic is required")
+ }
+
+ if options.SubscriptionName == "" {
+ return nil, newError(SubscriptionNotFound, "subscription name is required for consumer")
+ }
+
+ if options.ReceiverQueueSize == 0 {
+ options.ReceiverQueueSize = 1000
+ }
+
+ if options.TopicsPattern != "" {
+ if options.Topics != nil {
+ return nil, newError(ResultInvalidConfiguration, "Topic names list must be null when use topicsPattern")
+ }
+ // TODO: impl logic
+ } else if options.Topics != nil && len(options.Topics) > 1 {
+ // TODO: impl logic
+ } else if options.Topics != nil && len(options.Topics) == 1 || options.Topic != "" {
+ var singleTopicName string
+ if options.Topic != "" {
+ singleTopicName = options.Topic
+ } else {
+ singleTopicName = options.Topics[0]
+ }
+ return singleTopicSubscribe(client, options, singleTopicName)
+ }
+
+ return nil, newError(ResultInvalidTopicName, "topic name is required for consumer")
+}
+
+func singleTopicSubscribe(client *client, options *ConsumerOptions, topic string) (*consumer, error) {
+ c := &consumer{
+ topicName: topic,
+ queue: make(chan ConsumerMessage, options.ReceiverQueueSize),
+ }
+
+ partitions, err := client.TopicPartitions(topic)
+ if err != nil {
+ return nil, err
+ }
+
+ numPartitions := len(partitions)
+ c.consumers = make([]Consumer, numPartitions)
+
+ type ConsumerError struct {
+ err error
+ partition int
+ cons Consumer
+ }
+
+ ch := make(chan ConsumerError, numPartitions)
+
+ for partitionIdx, partitionTopic := range partitions {
+ go func(partitionIdx int, partitionTopic string) {
+ cons, err := newPartitionConsumer(client, partitionTopic, options, partitionIdx)
+ ch <- ConsumerError{
+ err: err,
+ partition: partitionIdx,
+ cons: cons,
+ }
+ }(partitionIdx, partitionTopic)
+ }
+
+ for i := 0; i < numPartitions; i++ {
+ ce, ok := <-ch
+ if ok {
+ err = ce.err
+ c.consumers[ce.partition] = ce.cons
+ }
+ }
+
+ if err != nil {
+ // Since there were some failures, cleanup all the partitions that succeeded in creating the consumers
+ for _, consumer := range c.consumers {
+ if !util.IsNil(consumer) {
+ if err = consumer.Close(); err != nil {
+ panic("close consumer error, please check.")
+ }
+ }
+ }
+ return nil, err
+ }
+
+ return c, nil
+}
+
+func (c *consumer) Topic() string {
+ return c.topicName
+}
+
+func (c *consumer) Subscription() string {
+ return c.consumers[0].Subscription()
+}
+
+func (c *consumer) Unsubscribe() error {
+ var errMsg string
+ for _, c := range c.consumers {
+ if err := c.Unsubscribe(); err != nil {
+ errMsg += fmt.Sprintf("topic %s, subscription %s: %s", c.Topic(), c.Subscription(), err)
+ }
+ }
+ if errMsg != "" {
+ return errors.New(errMsg)
+ }
+ return nil
+}
+
+func (c *consumer) Receive(ctx context.Context) (Message, error) {
+ for _, pc := range c.consumers {
+ go func(pc Consumer) {
+ if err := pc.ReceiveAsync(ctx, c.queue); err != nil {
+ return
+ }
+ }(pc)
+ }
+
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case msg, ok := <-c.queue:
+ if ok {
+ return msg.Message, nil
+ }
+ return nil, errors.New("receive message error")
+ }
+}
+
+func (c *consumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error {
+ //TODO: impl logic
+ return nil
+}
+
+//Ack the consumption of a single message
+func (c *consumer) Ack(msg Message) error {
+ return c.AckID(msg.ID())
+}
+
+// Ack the consumption of a single message, identified by its MessageID
+func (c *consumer) AckID(msgID MessageID) error {
+ id := &pb.MessageIdData{}
+ err := proto.Unmarshal(msgID.Serialize(), id)
+ if err != nil {
+ c.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
+ return err
+ }
+
+ partition := id.GetPartition()
+ if partition < 0 {
+ return c.consumers[0].AckID(msgID)
+ }
+ return c.consumers[partition].AckID(msgID)
+}
+
+func (c *consumer) AckCumulative(msg Message) error {
+ return c.AckCumulativeID(msg.ID())
+}
+
+func (c *consumer) AckCumulativeID(msgID MessageID) error {
+ id := &pb.MessageIdData{}
+ err := proto.Unmarshal(msgID.Serialize(), id)
+ if err != nil {
+ c.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
+ return err
+ }
+
+ partition := id.GetPartition()
+ if partition < 0 {
+ return errors.New("invalid partition index")
+ }
+ return c.consumers[partition].AckCumulativeID(msgID)
+}
+
+func (c *consumer) Close() error {
+ for _, pc := range c.consumers {
+ return pc.Close()
+ }
+ return nil
+}
+
+func (c *consumer) Seek(msgID MessageID) error {
+ id := &pb.MessageIdData{}
+ err := proto.Unmarshal(msgID.Serialize(), id)
+ if err != nil {
+ c.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
+ return err
+ }
+
+ partition := id.GetPartition()
+
+ if partition < 0 {
+ return errors.New("invalid partition index")
+ }
+ return c.consumers[partition].Seek(msgID)
+}
+
+func (c *consumer) RedeliverUnackedMessages() error {
+ var errMsg string
+ for _, c := range c.consumers {
+ if err := c.RedeliverUnackedMessages(); err != nil {
+ errMsg += fmt.Sprintf("topic %s, subscription %s: %s", c.Topic(), c.Subscription(), err)
+ }
+ }
+
+ if errMsg != "" {
+ return errors.New(errMsg)
+ }
+ return nil
+}
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index a7b1207..38d372a 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -18,10 +18,16 @@
package pulsar
import (
+ "time"
+
"github.com/apache/pulsar-client-go/pkg/pb"
"github.com/golang/protobuf/proto"
)
+func earliestMessageID() MessageID {
+ return newMessageID(-1, -1, -1, -1)
+}
+
type messageID struct {
ledgerID int64
entryID int64
@@ -64,12 +70,59 @@
return id, nil
}
-func earliestMessageID() MessageID {
- return newMessageID(-1, -1, -1, -1)
-}
-
const maxLong int64 = 0x7fffffffffffffff
func latestMessageID() MessageID {
return newMessageID(maxLong, maxLong, -1, -1)
}
+
+func timeFromUnixTimestampMillis(timestamp uint64) time.Time {
+ ts := int64(timestamp) * int64(time.Millisecond)
+ seconds := ts / int64(time.Second)
+ nanos := ts - (seconds * int64(time.Second))
+ return time.Unix(seconds, nanos)
+}
+
+func timeToUnixTimestampMillis(t time.Time) uint64 {
+ nanos := t.UnixNano()
+ millis := nanos / int64(time.Millisecond)
+ return uint64(millis)
+}
+
+type message struct {
+ publishTime time.Time
+ eventTime time.Time
+ key string
+ payLoad []byte
+ msgID MessageID
+ properties map[string]string
+ topic string
+}
+
+func (msg *message) Topic() string {
+ return msg.topic
+}
+
+func (msg *message) Properties() map[string]string {
+ return msg.properties
+}
+
+func (msg *message) Payload() []byte {
+ return msg.payLoad
+}
+
+func (msg *message) ID() MessageID {
+ return msg.msgID
+}
+
+func (msg *message) PublishTime() time.Time {
+ return msg.publishTime
+}
+
+func (msg *message) EventTime() time.Time {
+ return msg.eventTime
+}
+
+func (msg *message) Key() string {
+ return msg.key
+}
diff --git a/pulsar/impl_partition_consumer.go b/pulsar/impl_partition_consumer.go
new file mode 100644
index 0000000..0d7069f
--- /dev/null
+++ b/pulsar/impl_partition_consumer.go
@@ -0,0 +1,688 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+ "context"
+ "fmt"
+ "math"
+ "sync"
+ "time"
+
+ "github.com/apache/pulsar-client-go/pkg/pb"
+ "github.com/apache/pulsar-client-go/pulsar/internal"
+ "github.com/apache/pulsar-client-go/util"
+ "github.com/golang/protobuf/proto"
+
+ log "github.com/sirupsen/logrus"
+)
+
+const maxRedeliverUnacknowledged = 1000
+
+type consumerState int
+
+const (
+ consumerInit = iota
+ consumerReady
+ consumerClosing
+ consumerClosed
+)
+
+var (
+ subType pb.CommandSubscribe_SubType
+ position pb.CommandSubscribe_InitialPosition
+)
+
+type partitionConsumer struct {
+ state consumerState
+ client *client
+ topic string
+ log *log.Entry
+ cnx internal.Connection
+
+ options *ConsumerOptions
+ consumerName *string
+ consumerID uint64
+ subQueue chan ConsumerMessage
+
+ omu sync.Mutex // protects following
+ overflow []*pb.MessageIdData
+
+ unAckTracker *UnackedMessageTracker
+
+ eventsChan chan interface{}
+ partitionIdx int
+}
+
+func newPartitionConsumer(client *client, topic string, options *ConsumerOptions, partitionID int) (*partitionConsumer, error) {
+ c := &partitionConsumer{
+ state: consumerInit,
+ client: client,
+ topic: topic,
+ options: options,
+ log: log.WithField("topic", topic),
+ consumerID: client.rpcClient.NewConsumerID(),
+ partitionIdx: partitionID,
+ eventsChan: make(chan interface{}),
+ subQueue: make(chan ConsumerMessage, options.ReceiverQueueSize),
+ }
+
+ c.setDefault(options)
+
+ if options.MessageChannel == nil {
+ options.MessageChannel = make(chan ConsumerMessage, options.ReceiverQueueSize)
+ } else {
+ c.subQueue = options.MessageChannel
+ }
+
+ if options.Name != "" {
+ c.consumerName = &options.Name
+ }
+
+ switch options.Type {
+ case Exclusive:
+ subType = pb.CommandSubscribe_Exclusive
+ case Failover:
+ subType = pb.CommandSubscribe_Failover
+ case Shared:
+ subType = pb.CommandSubscribe_Shared
+ case KeyShared:
+ subType = pb.CommandSubscribe_Key_Shared
+ }
+
+ if options.Type == Shared || options.Type == KeyShared {
+ if options.AckTimeout != 0 {
+ c.unAckTracker = NewUnackedMessageTracker()
+ c.unAckTracker.pc = c
+ c.unAckTracker.Start(int64(options.AckTimeout))
+ }
+ }
+
+ switch options.SubscriptionInitPos {
+ case Latest:
+ position = pb.CommandSubscribe_Latest
+ case Earliest:
+ position = pb.CommandSubscribe_Earliest
+ }
+
+ err := c.grabCnx()
+ if err != nil {
+ log.WithError(err).Errorf("Failed to create consumer")
+ return nil, err
+ }
+ c.log = c.log.WithField("name", c.consumerName)
+ c.log.Info("Created consumer")
+ c.state = consumerReady
+ go c.runEventsLoop()
+
+ return c, nil
+}
+
+func (pc *partitionConsumer) setDefault(options *ConsumerOptions) {
+ if options.ReceiverQueueSize <= 0 {
+ options.ReceiverQueueSize = 1000
+ }
+
+ if options.AckTimeout == 0 {
+ options.AckTimeout = time.Second * 30
+ }
+
+ position = pb.CommandSubscribe_Latest
+ subType = pb.CommandSubscribe_Exclusive
+}
+
+func (pc *partitionConsumer) grabCnx() error {
+ lr, err := pc.client.lookupService.Lookup(pc.topic)
+ if err != nil {
+ pc.log.WithError(err).Warn("Failed to lookup topic")
+ return err
+ }
+
+ pc.log.Debug("Lookup result: ", lr)
+ requestID := pc.client.rpcClient.NewRequestID()
+ res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
+ pb.BaseCommand_SUBSCRIBE, &pb.CommandSubscribe{
+ RequestId: proto.Uint64(requestID),
+ Topic: proto.String(pc.topic),
+ SubType: subType.Enum(),
+ Subscription: proto.String(pc.options.SubscriptionName),
+ ConsumerId: proto.Uint64(pc.consumerID),
+ ConsumerName: proto.String(pc.options.Name),
+ InitialPosition: position.Enum(),
+ Schema: nil,
+ })
+
+ if err != nil {
+ pc.log.WithError(err).Error("Failed to create consumer")
+ return err
+ }
+
+ if res.Response.ConsumerStatsResponse != nil {
+ pc.consumerName = res.Response.ConsumerStatsResponse.ConsumerName
+ }
+
+ pc.cnx = res.Cnx
+ pc.log.WithField("cnx", res.Cnx).Debug("Connected consumer")
+
+ msgType := res.Response.GetType()
+
+ switch msgType {
+ case pb.BaseCommand_SUCCESS:
+ pc.cnx.AddConsumeHandler(pc.consumerID, pc)
+ if err := pc.internalFlow(uint32(pc.options.ReceiverQueueSize)); err != nil {
+ return err
+ }
+ return nil
+ case pb.BaseCommand_ERROR:
+ errMsg := res.Response.GetError()
+ return fmt.Errorf("%s: %s", errMsg.GetError().String(), errMsg.GetMessage())
+ default:
+ return util.NewUnexpectedErrMsg(msgType, requestID)
+ }
+}
+
+func (pc *partitionConsumer) Topic() string {
+ return pc.topic
+}
+
+func (pc *partitionConsumer) Subscription() string {
+ return pc.options.SubscriptionName
+}
+
+func (pc *partitionConsumer) Unsubscribe() error {
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ hu := &handleUnsubscribe{
+ waitGroup: wg,
+ err: nil,
+ }
+ pc.eventsChan <- hu
+
+ wg.Wait()
+ return hu.err
+}
+
+func (pc *partitionConsumer) internalUnsubscribe(unsub *handleUnsubscribe) {
+ requestID := pc.client.rpcClient.NewRequestID()
+ _, err := pc.client.rpcClient.RequestOnCnx(pc.cnx, requestID,
+ pb.BaseCommand_UNSUBSCRIBE, &pb.CommandUnsubscribe{
+ RequestId: proto.Uint64(requestID),
+ ConsumerId: proto.Uint64(pc.consumerID),
+ })
+ if err != nil {
+ pc.log.WithError(err).Error("Failed to unsubscribe consumer")
+ unsub.err = err
+ }
+
+ pc.cnx.DeleteConsumeHandler(pc.consumerID)
+ if pc.unAckTracker != nil {
+ pc.unAckTracker.Stop()
+ }
+
+ unsub.waitGroup.Done()
+}
+
+func (pc *partitionConsumer) Receive(ctx context.Context) (Message, error) {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case cm, ok := <-pc.subQueue:
+ if ok {
+ id := &pb.MessageIdData{}
+ err := proto.Unmarshal(cm.ID().Serialize(), id)
+ if err != nil {
+ pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
+ return nil, err
+ }
+ if pc.unAckTracker != nil {
+ pc.unAckTracker.Add(id)
+ }
+ return cm.Message, nil
+ }
+ return nil, newError(ResultConnectError, "receive queue closed")
+ }
+}
+
+func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error {
+ highwater := uint32(math.Max(float64(cap(pc.options.MessageChannel)/2), 1))
+
+ // request half the buffer's capacity
+ if err := pc.internalFlow(highwater); err != nil {
+ pc.log.Errorf("Send Flow cmd error:%s", err.Error())
+ return err
+ }
+ var receivedSinceFlow uint32
+
+ for {
+ select {
+ case tmpMsg, ok := <-pc.subQueue:
+ if ok {
+ msgs <- tmpMsg
+ id := &pb.MessageIdData{}
+ err := proto.Unmarshal(tmpMsg.ID().Serialize(), id)
+ if err != nil {
+ pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
+ return err
+ }
+ if pc.unAckTracker != nil {
+ pc.unAckTracker.Add(id)
+ }
+ receivedSinceFlow++
+ if receivedSinceFlow >= highwater {
+ if err := pc.internalFlow(receivedSinceFlow); err != nil {
+ pc.log.Errorf("Send Flow cmd error:%s", err.Error())
+ return err
+ }
+ receivedSinceFlow = 0
+ }
+ continue
+ }
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+
+}
+
+func (pc *partitionConsumer) Ack(msg Message) error {
+ return pc.AckID(msg.ID())
+}
+
+func (pc *partitionConsumer) AckID(msgID MessageID) error {
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+ ha := &handleAck{
+ msgID: msgID,
+ waitGroup: wg,
+ err: nil,
+ }
+ pc.eventsChan <- ha
+ wg.Wait()
+ return ha.err
+}
+
+func (pc *partitionConsumer) internalAck(ack *handleAck) {
+ id := &pb.MessageIdData{}
+ messageIDs := make([]*pb.MessageIdData, 0)
+ err := proto.Unmarshal(ack.msgID.Serialize(), id)
+ if err != nil {
+ pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
+ ack.err = err
+ }
+
+ messageIDs = append(messageIDs, id)
+
+ requestID := pc.client.rpcClient.NewRequestID()
+ _, err = pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID,
+ pb.BaseCommand_ACK, &pb.CommandAck{
+ ConsumerId: proto.Uint64(pc.consumerID),
+ MessageId: messageIDs,
+ AckType: pb.CommandAck_Individual.Enum(),
+ })
+ if err != nil {
+ pc.log.WithError(err).Error("Failed to unsubscribe consumer")
+ ack.err = err
+ }
+
+ if pc.unAckTracker != nil {
+ pc.unAckTracker.Remove(id)
+ }
+ ack.waitGroup.Done()
+}
+
+func (pc *partitionConsumer) AckCumulative(msg Message) error {
+ return pc.AckCumulativeID(msg.ID())
+}
+
+func (pc *partitionConsumer) AckCumulativeID(msgID MessageID) error {
+ hac := &handleAckCumulative{
+ msgID: msgID,
+ err: nil,
+ }
+ pc.eventsChan <- hac
+
+ return hac.err
+}
+
+func (pc *partitionConsumer) internalAckCumulative(ackCumulative *handleAckCumulative) {
+ id := &pb.MessageIdData{}
+ messageIDs := make([]*pb.MessageIdData, 0)
+ err := proto.Unmarshal(ackCumulative.msgID.Serialize(), id)
+ if err != nil {
+ pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
+ ackCumulative.err = err
+ }
+ messageIDs = append(messageIDs, id)
+
+ requestID := pc.client.rpcClient.NewRequestID()
+ _, err = pc.client.rpcClient.RequestOnCnx(pc.cnx, requestID,
+ pb.BaseCommand_ACK, &pb.CommandAck{
+ ConsumerId: proto.Uint64(pc.consumerID),
+ MessageId: messageIDs,
+ AckType: pb.CommandAck_Cumulative.Enum(),
+ })
+ if err != nil {
+ pc.log.WithError(err).Error("Failed to unsubscribe consumer")
+ ackCumulative.err = err
+ }
+
+ if pc.unAckTracker != nil {
+ pc.unAckTracker.Remove(id)
+ }
+}
+
+func (pc *partitionConsumer) Close() error {
+ if pc.state != consumerReady {
+ return nil
+ }
+ if pc.unAckTracker != nil {
+ pc.unAckTracker.Stop()
+ }
+
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+
+ cc := &handlerClose{&wg, nil}
+ pc.eventsChan <- cc
+
+ wg.Wait()
+ return cc.err
+}
+
+func (pc *partitionConsumer) Seek(msgID MessageID) error {
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ hc := &handleSeek{
+ msgID: msgID,
+ waitGroup: wg,
+ err: nil,
+ }
+ pc.eventsChan <- hc
+
+ wg.Wait()
+ return hc.err
+}
+
+func (pc *partitionConsumer) internalSeek(seek *handleSeek) {
+ if pc.state == consumerClosing || pc.state == consumerClosed {
+ pc.log.Error("Consumer was already closed")
+ return
+ }
+
+ id := &pb.MessageIdData{}
+ err := proto.Unmarshal(seek.msgID.Serialize(), id)
+ if err != nil {
+ pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
+ seek.err = err
+ }
+
+ requestID := pc.client.rpcClient.NewRequestID()
+ _, err = pc.client.rpcClient.RequestOnCnx(pc.cnx, requestID,
+ pb.BaseCommand_SEEK, &pb.CommandSeek{
+ ConsumerId: proto.Uint64(pc.consumerID),
+ RequestId: proto.Uint64(requestID),
+ MessageId: id,
+ })
+ if err != nil {
+ pc.log.WithError(err).Error("Failed to unsubscribe consumer")
+ seek.err = err
+ }
+
+ seek.waitGroup.Done()
+}
+
+func (pc *partitionConsumer) RedeliverUnackedMessages() error {
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ hr := &handleRedeliver{
+ waitGroup: wg,
+ err: nil,
+ }
+ pc.eventsChan <- hr
+ wg.Wait()
+ return hr.err
+}
+
+func (pc *partitionConsumer) internalRedeliver(redeliver *handleRedeliver) {
+ pc.omu.Lock()
+ defer pc.omu.Unlock()
+
+ overFlowSize := len(pc.overflow)
+
+ if overFlowSize == 0 {
+ return
+ }
+
+ requestID := pc.client.rpcClient.NewRequestID()
+
+ for i := 0; i < len(pc.overflow); i += maxRedeliverUnacknowledged {
+ end := i + maxRedeliverUnacknowledged
+ if end > overFlowSize {
+ end = overFlowSize
+ }
+ _, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID,
+ pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, &pb.CommandRedeliverUnacknowledgedMessages{
+ ConsumerId: proto.Uint64(pc.consumerID),
+ MessageIds: pc.overflow[i:end],
+ })
+ if err != nil {
+ pc.log.WithError(err).Error("Failed to unsubscribe consumer")
+ redeliver.err = err
+ }
+ }
+
+ // clear Overflow slice
+ pc.overflow = nil
+
+ if pc.unAckTracker != nil {
+ pc.unAckTracker.clear()
+ }
+ redeliver.waitGroup.Done()
+}
+
+func (pc *partitionConsumer) runEventsLoop() {
+ for {
+ select {
+ case i := <-pc.eventsChan:
+ switch v := i.(type) {
+ case *handlerClose:
+ pc.internalClose(v)
+ return
+ case *handleSeek:
+ pc.internalSeek(v)
+ case *handleUnsubscribe:
+ pc.internalUnsubscribe(v)
+ case *handleAckCumulative:
+ pc.internalAckCumulative(v)
+ case *handleAck:
+ pc.internalAck(v)
+ case *handleRedeliver:
+ pc.internalRedeliver(v)
+ case *handleConnectionClosed:
+ pc.reconnectToBroker()
+ }
+ }
+ }
+}
+
+func (pc *partitionConsumer) internalClose(req *handlerClose) {
+ if pc.state != consumerReady {
+ req.waitGroup.Done()
+ return
+ }
+
+ pc.state = consumerClosing
+ pc.log.Info("Closing consumer")
+
+ requestID := pc.client.rpcClient.NewRequestID()
+ _, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID, pb.BaseCommand_CLOSE_CONSUMER, &pb.CommandCloseConsumer{
+ ConsumerId: proto.Uint64(pc.consumerID),
+ RequestId: proto.Uint64(requestID),
+ })
+ pc.cnx.DeleteConsumeHandler(pc.consumerID)
+
+ if err != nil {
+ req.err = err
+ } else {
+ pc.log.Info("Closed consumer")
+ pc.state = consumerClosed
+ close(pc.options.MessageChannel)
+ //pc.cnx.UnregisterListener(pc.consumerID)
+ }
+
+ req.waitGroup.Done()
+}
+
+// Flow command gives additional permits to send messages to the consumer.
+// A typical consumer implementation will use a queue to accuMulate these messages
+// before the application is ready to consume them. After the consumer is ready,
+// the client needs to give permission to the broker to push messages.
+func (pc *partitionConsumer) internalFlow(permits uint32) error {
+ if permits <= 0 {
+ return fmt.Errorf("invalid number of permits requested: %d", permits)
+ }
+
+ requestID := pc.client.rpcClient.NewRequestID()
+ _, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID,
+ pb.BaseCommand_FLOW, &pb.CommandFlow{
+ ConsumerId: proto.Uint64(pc.consumerID),
+ MessagePermits: proto.Uint32(permits),
+ })
+
+ if err != nil {
+ pc.log.WithError(err).Error("Failed to unsubscribe consumer")
+ return err
+ }
+ return nil
+}
+
+func (pc *partitionConsumer) HandlerMessage(response *pb.CommandMessage, headersAndPayload []byte) error {
+ msgID := response.GetMessageId()
+
+ id := newMessageID(int64(msgID.GetLedgerId()), int64(msgID.GetEntryId()),
+ int(msgID.GetBatchIndex()), pc.partitionIdx)
+
+ msgMeta, payload, err := internal.ParseMessage(headersAndPayload)
+ if err != nil {
+ return fmt.Errorf("parse message error:%s", err)
+ }
+
+ //numMsgs := msgMeta.GetNumMessagesInBatch()
+
+ msg := &message{
+ publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+ eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
+ key: msgMeta.GetPartitionKey(),
+ properties: internal.ConvertToStringMap(msgMeta.GetProperties()),
+ topic: pc.topic,
+ msgID: id,
+ payLoad: payload,
+ }
+
+ consumerMsg := ConsumerMessage{
+ Message: msg,
+ Consumer: pc,
+ }
+
+ select {
+ case pc.subQueue <- consumerMsg:
+ // Add messageId to Overflow buffer, avoiding duplicates.
+ newMid := response.GetMessageId()
+ var dup bool
+
+ pc.omu.Lock()
+ for _, mid := range pc.overflow {
+ if proto.Equal(mid, newMid) {
+ dup = true
+ break
+ }
+ }
+
+ if !dup {
+ pc.overflow = append(pc.overflow, newMid)
+ }
+ pc.omu.Unlock()
+ return nil
+ default:
+ return fmt.Errorf("consumer message channel on topic %s is full (capacity = %d)", pc.Topic(), cap(pc.options.MessageChannel))
+ }
+}
+
+type handleAck struct {
+ msgID MessageID
+ waitGroup *sync.WaitGroup
+ err error
+}
+
+type handleAckCumulative struct {
+ msgID MessageID
+ err error
+}
+
+type handleUnsubscribe struct {
+ waitGroup *sync.WaitGroup
+ err error
+}
+
+type handleSeek struct {
+ msgID MessageID
+ waitGroup *sync.WaitGroup
+ err error
+}
+
+type handleRedeliver struct {
+ waitGroup *sync.WaitGroup
+ err error
+}
+
+type handlerClose struct {
+ waitGroup *sync.WaitGroup
+ err error
+}
+
+type handleConnectionClosed struct{}
+
+func (pc *partitionConsumer) ConnectionClosed() {
+ // Trigger reconnection in the produce goroutine
+ pc.eventsChan <- &handleConnectionClosed{}
+}
+
+func (pc *partitionConsumer) reconnectToBroker() {
+ pc.log.Info("Reconnecting to broker")
+ backoff := internal.Backoff{}
+ for {
+ if pc.state != consumerReady {
+ // Consumer is already closing
+ return
+ }
+
+ err := pc.grabCnx()
+ if err == nil {
+ // Successfully reconnected
+ return
+ }
+
+ d := backoff.Next()
+ pc.log.Info("Retrying reconnection after ", d)
+
+ time.Sleep(d)
+ }
+}
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index ce8a2a9..98e2156 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -157,7 +157,7 @@
p.log.WithField("cnx", res.Cnx).Debug("Connected producer")
if p.pendingQueue.Size() > 0 {
- p.log.Infof("Resending %v pending batches", p.pendingQueue.Size())
+ p.log.Infof("Resending %d pending batches", p.pendingQueue.Size())
for it := p.pendingQueue.Iterator(); it.HasNext(); {
p.cnx.WriteData(it.Next().(*pendingItem).batchData)
}
@@ -165,8 +165,7 @@
return nil
}
-type connectionClosed struct {
-}
+type connectionClosed struct{}
func (p *partitionProducer) ConnectionClosed() {
// Trigger reconnection in the produce goroutine
@@ -230,7 +229,7 @@
msg := request.msg
- sendAsBatch := !p.options.DisableBatching && request.msg.ReplicationClusters == nil
+ sendAsBatch := !p.options.DisableBatching && msg.ReplicationClusters == nil
smm := &pb.SingleMessageMetadata{
PayloadSize: proto.Int(len(msg.Payload)),
}
diff --git a/pulsar/impl_producer.go b/pulsar/impl_producer.go
index 497a18c..75ed9c5 100644
--- a/pulsar/impl_producer.go
+++ b/pulsar/impl_producer.go
@@ -69,23 +69,29 @@
type ProducerError struct {
partition int
- Producer
- error
+ prod Producer
+ err error
}
c := make(chan ProducerError, numPartitions)
for partitionIdx, partition := range partitions {
- go func(index int) {
- prod, err := newPartitionProducer(client, partition, options, index)
- c <- ProducerError{partitionIdx, prod, err}
- }(partitionIdx)
+ go func(partitionIdx int, partition string) {
+ prod, err := newPartitionProducer(client, partition, options, partitionIdx)
+ c <- ProducerError{
+ partition: partitionIdx,
+ prod: prod,
+ err: err,
+ }
+ }(partitionIdx, partition)
}
for i := 0; i < numPartitions; i++ {
- pe := <-c
- err = pe.error
- p.producers[pe.partition] = pe.Producer
+ pe, ok := <-c
+ if ok {
+ err = pe.err
+ p.producers[pe.partition] = pe.prod
+ }
}
if err != nil {
@@ -97,6 +103,7 @@
}
return nil, err
}
+
return p, nil
}
@@ -134,21 +141,15 @@
}
func (p *producer) Flush() error {
- var err error
for _, pp := range p.producers {
- if e := pp.Flush(); e != nil && err == nil {
- err = e
- }
+ return pp.Flush()
}
- return err
+ return nil
}
func (p *producer) Close() error {
- var err error
for _, pp := range p.producers {
- if e := pp.Close(); e != nil && err == nil {
- err = e
- }
+ return pp.Close()
}
- return err
+ return nil
}
diff --git a/pulsar/internal/checksum.go b/pulsar/internal/checksum.go
index 11d8d4f..3fc37ae 100644
--- a/pulsar/internal/checksum.go
+++ b/pulsar/internal/checksum.go
@@ -17,13 +17,34 @@
package internal
-import "hash/crc32"
+import (
+ `hash`
+ `hash/crc32`
+)
// crc32cTable holds the precomputed crc32 hash table
// used by Pulsar (crc32c)
var crc32cTable = crc32.MakeTable(crc32.Castagnoli)
+type CheckSum struct {
+ hash hash.Hash
+}
+
// Crc32cCheckSum handles computing the checksum.
func Crc32cCheckSum(data []byte) uint32 {
return crc32.Checksum(data, crc32cTable)
}
+
+func (cs *CheckSum) Write(p []byte) (int, error) {
+ if cs.hash == nil {
+ cs.hash = crc32.New(crc32cTable)
+ }
+ return cs.hash.Write(p)
+}
+
+func (cs *CheckSum) compute() []byte {
+ if cs.hash == nil {
+ return nil
+ }
+ return cs.hash.Sum(nil)
+}
diff --git a/pulsar/internal/checksum_test.go b/pulsar/internal/checksum_test.go
new file mode 100644
index 0000000..23dc621
--- /dev/null
+++ b/pulsar/internal/checksum_test.go
@@ -0,0 +1,51 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package internal
+
+import (
+ "bytes"
+ "hash/crc32"
+ "testing"
+)
+
+func TestFrameChecksum(t *testing.T) {
+ input := []byte{1, 2, 3, 4, 5}
+ var f CheckSum
+
+ if got := f.compute(); got != nil {
+ t.Fatalf("compute() = %v; expected nil", got)
+ }
+
+ if _, err := f.Write(input); err != nil {
+ t.Fatalf("Write() err = %v; expected nil", err)
+ }
+
+ h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
+ if _, err := h.Write(input); err != nil {
+ t.Fatal(err)
+ }
+
+ if got, expected := f.compute(), h.Sum(nil); !bytes.Equal(got, expected) {
+ t.Fatalf("compute() = %x; expected %x", got, expected)
+ } else {
+ t.Logf("compute() = 0x%x", got)
+ }
+}
+
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 05ba409..5068ebb 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -18,16 +18,21 @@
package internal
import (
- "github.com/apache/pulsar-client-go/pkg/pb"
- "github.com/golang/protobuf/proto"
+ `bytes`
+ `encoding/binary`
+ `fmt`
+ "github.com/golang/protobuf/proto"
+ `io`
- log "github.com/sirupsen/logrus"
+ "github.com/apache/pulsar-client-go/pkg/pb"
+ log "github.com/sirupsen/logrus"
)
-// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
-const MaxFrameSize = 5 * 1024 * 1024
-
-const magicCrc32c uint16 = 0x0e01
+const (
+ // MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
+ MaxFrameSize = 5 * 1024 * 1024
+ magicCrc32c uint16 = 0x0e01
+)
func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand {
cmd := &pb.BaseCommand{
@@ -42,6 +47,10 @@
cmd.PartitionMetadata = msg.(*pb.CommandPartitionedTopicMetadata)
case pb.BaseCommand_PRODUCER:
cmd.Producer = msg.(*pb.CommandProducer)
+ case pb.BaseCommand_SUBSCRIBE:
+ cmd.Subscribe = msg.(*pb.CommandSubscribe)
+ case pb.BaseCommand_FLOW:
+ cmd.Flow = msg.(*pb.CommandFlow)
case pb.BaseCommand_PING:
cmd.Ping = msg.(*pb.CommandPing)
case pb.BaseCommand_PONG:
@@ -50,6 +59,16 @@
cmd.Send = msg.(*pb.CommandSend)
case pb.BaseCommand_CLOSE_PRODUCER:
cmd.CloseProducer = msg.(*pb.CommandCloseProducer)
+ case pb.BaseCommand_CLOSE_CONSUMER:
+ cmd.CloseConsumer = msg.(*pb.CommandCloseConsumer)
+ case pb.BaseCommand_ACK:
+ cmd.Ack = msg.(*pb.CommandAck)
+ case pb.BaseCommand_SEEK:
+ cmd.Seek = msg.(*pb.CommandSeek)
+ case pb.BaseCommand_UNSUBSCRIBE:
+ cmd.Unsubscribe = msg.(*pb.CommandUnsubscribe)
+ case pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES:
+ cmd.RedeliverUnacknowledgedMessages = msg.(*pb.CommandRedeliverUnacknowledgedMessages)
default:
log.Panic("Missing command type: ", cmdType)
}
@@ -67,6 +86,110 @@
wb.Write(payload)
}
+func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payload []byte, err error) {
+ // reusable buffer for 4-byte uint32s
+ buf32 := make([]byte, 4)
+ r := bytes.NewReader(headersAndPayload)
+ // Wrap our reader so that we can only read
+ // bytes from our frame
+ lr := &io.LimitedReader{
+ N: int64(len(headersAndPayload)),
+ R: r,
+ }
+ // There are 3 possibilities for the following fields:
+ // - EOF: If so, this is a "simple" command. No more parsing required.
+ // - 2-byte magic number: Indicates the following 4 bytes are a checksum
+ // - 4-byte metadata size
+
+ // The message may optionally stop here. If so,
+ // this is a "simple" command.
+ if lr.N <= 0 {
+ return nil, nil, nil
+ }
+
+ // Optionally, the next 2 bytes may be the magicNumber. If
+ // so, it indicates that the following 4 bytes are a checksum.
+ // If not, the following 2 bytes (plus the 2 bytes already read),
+ // are the metadataSize, which is why a 4 byte buffer is used.
+ if _, err = io.ReadFull(lr, buf32); err != nil {
+ return nil, nil, err
+ }
+
+ // Check for magicNumber which indicates a checksum
+ var chksum CheckSum
+ var expectedChksum []byte
+
+ magicNumber := make([]byte, 2)
+ binary.BigEndian.PutUint16(magicNumber, magicCrc32c)
+ if magicNumber[0] == buf32[0] && magicNumber[1] == buf32[1] {
+ expectedChksum = make([]byte, 4)
+
+ // We already read the 2-byte magicNumber and the
+ // initial 2 bytes of the checksum
+ expectedChksum[0] = buf32[2]
+ expectedChksum[1] = buf32[3]
+
+ // Read the remaining 2 bytes of the checksum
+ if _, err = io.ReadFull(lr, expectedChksum[2:]); err != nil {
+ return nil, nil, err
+ }
+
+ // Use a tee reader to compute the checksum
+ // of everything consumed after this point
+ lr.R = io.TeeReader(lr.R, &chksum)
+
+ // Fill buffer with metadata size, which is what it
+ // would already contain if there were no magic number / checksum
+ if _, err = io.ReadFull(lr, buf32); err != nil {
+ return nil, nil, err
+ }
+ }
+
+ // Read metadataSize
+ metadataSize := binary.BigEndian.Uint32(buf32)
+ // guard against allocating large buffer
+ if metadataSize > MaxFrameSize {
+ return nil, nil, fmt.Errorf("frame metadata size (%d) "+
+ "cannot b greater than max frame size (%d)", metadataSize, MaxFrameSize)
+ }
+
+ // Read protobuf encoded metadata
+ metaBuf := make([]byte, metadataSize)
+ if _, err = io.ReadFull(lr, metaBuf); err != nil {
+ return nil, nil, err
+ }
+ msgMeta = new(pb.MessageMetadata)
+ if err = proto.Unmarshal(metaBuf, msgMeta); err != nil {
+ return nil, nil, err
+ }
+
+ batchLen := make([]byte, 2)
+ if _, err = io.ReadFull(lr, batchLen); err != nil {
+ return nil, nil, err
+ }
+
+ // Anything left in the frame is considered
+ // the payload and can be any sequence of bytes.
+ if lr.N > 0 {
+ // guard against allocating large buffer
+ if lr.N > MaxFrameSize {
+ return nil, nil, fmt.Errorf("frame payload size (%d) "+
+ "cannot be greater than max frame size (%d)", lr.N, MaxFrameSize)
+ }
+ payload = make([]byte, lr.N)
+ if _, err = io.ReadFull(lr, payload); err != nil {
+ return nil, nil, err
+ }
+ }
+
+ if computed := chksum.compute(); !bytes.Equal(computed, expectedChksum) {
+ return nil, nil, fmt.Errorf("checksum mismatch: computed (0x%X) does "+
+ "not match given checksum (0x%X)", computed, expectedChksum)
+ }
+
+ return msgMeta, payload, nil
+}
+
func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageMetadata, payload []byte) {
// Wire format
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 7d5ee74..11337ec 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -58,9 +58,15 @@
WriteData(data []byte)
RegisterListener(id uint64, listener ConnectionListener)
UnregisterListener(id uint64)
+ AddConsumeHandler(id uint64, handler ConsumerHandler)
+ DeleteConsumeHandler(id uint64)
Close()
}
+type ConsumerHandler interface {
+ HandlerMessage(response *pb.CommandMessage, headersAndPayload []byte) error
+}
+
type connectionState int
const (
@@ -101,6 +107,7 @@
writeRequests chan []byte
pendingReqs map[uint64]*request
listeners map[uint64]ConnectionListener
+ connWrapper *ConnWrapper
tlsOptions *TLSOptions
auth auth.Provider
@@ -122,6 +129,7 @@
incomingRequests: make(chan *request),
writeRequests: make(chan []byte),
listeners: make(map[uint64]ConnectionListener),
+ connWrapper: NewConnWrapper(),
}
cnx.reader = newConnectionReader(cnx)
cnx.cond = sync.NewCond(cnx)
@@ -295,6 +303,7 @@
func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []byte) {
c.log.Debugf("Received command: %s -- payload: %v", cmd, headersAndPayload)
c.lastDataReceivedTime = time.Now()
+ var err error
switch *cmd.Type {
case pb.BaseCommand_SUCCESS:
@@ -331,6 +340,7 @@
case pb.BaseCommand_SEND_ERROR:
case pb.BaseCommand_MESSAGE:
+ err = c.handleMessage(cmd.GetMessage(), headersAndPayload)
case pb.BaseCommand_PING:
c.handlePing()
case pb.BaseCommand_PONG:
@@ -339,7 +349,9 @@
case pb.BaseCommand_ACTIVE_CONSUMER_CHANGE:
default:
- c.log.Errorf("Received invalid command type: %s", cmd.Type)
+ if err != nil {
+ c.log.Errorf("Received invalid command type: %s", cmd.Type)
+ }
c.Close()
}
}
@@ -382,6 +394,21 @@
}
}
+func (c *connection) handleMessage(response *pb.CommandMessage, payload []byte) error {
+ c.log.Debug("Got Message: ", response)
+ consumerId := response.GetConsumerId()
+ if consumer, ok := c.connWrapper.Consumers[consumerId]; ok {
+ err := consumer.HandlerMessage(response, payload)
+ if err != nil {
+ c.log.WithField("consumerId", consumerId).Error("handle message err: ", response.MessageId)
+ return errors.New("handler not found")
+ }
+ } else {
+ c.log.WithField("consumerId", consumerId).Warn("Got unexpected message: ", response.MessageId)
+ }
+ return nil
+}
+
func (c *connection) sendPing() {
if c.lastDataReceivedTime.Add(2 * keepAliveInterval).Before(time.Now()) {
// We have not received a response to the previous Ping request, the
@@ -485,3 +512,26 @@
return tlsConfig, nil
}
+
+type ConnWrapper struct {
+ Rwmu sync.RWMutex
+ Consumers map[uint64]ConsumerHandler
+}
+
+func NewConnWrapper() *ConnWrapper {
+ return &ConnWrapper{
+ Consumers: make(map[uint64]ConsumerHandler),
+ }
+}
+
+func (c *connection) AddConsumeHandler(id uint64, handler ConsumerHandler) {
+ c.connWrapper.Rwmu.Lock()
+ c.connWrapper.Consumers[id] = handler
+ c.connWrapper.Rwmu.Unlock()
+}
+
+func (c *connection) DeleteConsumeHandler(id uint64) {
+ c.connWrapper.Rwmu.Lock()
+ delete(c.connWrapper.Consumers, id)
+ c.connWrapper.Rwmu.Unlock()
+}
diff --git a/pulsar/internal/lookup_service_test.go b/pulsar/internal/lookup_service_test.go
index 8baddf3..de7f058 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -95,6 +95,12 @@
return nil, nil
}
+func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, requestId uint64, cmdType pb.BaseCommand_Type,
+ message proto.Message) (*RPCResult, error) {
+ assert.Fail(c.t, "Shouldn't be called")
+ return nil, nil
+}
+
func responseType(r pb.CommandLookupTopicResponse_LookupType) *pb.CommandLookupTopicResponse_LookupType {
return &r
}
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 8234dc8..2b68a71 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -45,6 +45,8 @@
Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
+ RequestOnCnxNoWait(cnx Connection, requestId uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
+
RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
}
@@ -110,6 +112,19 @@
return rpcResult, nil
}
+func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, requestId uint64, cmdType pb.BaseCommand_Type,
+ message proto.Message) (*RPCResult, error) {
+ rpcResult := &RPCResult{
+ Cnx: cnx,
+ }
+
+ cnx.SendRequest(requestId, baseCommand(cmdType, message), func(response *pb.BaseCommand) {
+ rpcResult.Response = response
+ })
+
+ return rpcResult, nil
+}
+
func (c *rpcClient) NewRequestID() uint64 {
return atomic.AddUint64(&c.requestIDGenerator, 1)
}
diff --git a/pulsar/message.go b/pulsar/message.go
index 1862c18..a70827d 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -62,7 +62,7 @@
// EventTime get the event time associated with this message. It is typically set by the applications via
// `ProducerMessage.EventTime`.
// If there isn't any event time associated with this event, it will be nil.
- EventTime() *time.Time
+ EventTime() time.Time
// Key get the key of the message, if any
Key() string
diff --git a/pulsar/unackMsgTracker_test.go b/pulsar/unackMsgTracker_test.go
new file mode 100644
index 0000000..d292fc8
--- /dev/null
+++ b/pulsar/unackMsgTracker_test.go
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+ `github.com/apache/pulsar-client-go/pkg/pb`
+ `github.com/golang/protobuf/proto`
+ `github.com/stretchr/testify/assert`
+ `testing`
+)
+
+func TestUnackedMessageTracker(t *testing.T) {
+ unAckTracker := NewUnackedMessageTracker()
+
+ var msgIDs []*pb.MessageIdData
+
+ for i := 0; i < 5; i++ {
+ msgID := &pb.MessageIdData{
+ LedgerId: proto.Uint64(1),
+ EntryId: proto.Uint64(uint64(i)),
+ Partition: proto.Int32(-1),
+ BatchIndex: proto.Int32(-1),
+ }
+
+ msgIDs = append(msgIDs, msgID)
+ }
+
+ for _, msgID := range msgIDs {
+ ok := unAckTracker.Add(msgID)
+ assert.True(t, ok)
+ }
+
+ flag := unAckTracker.IsEmpty()
+ assert.False(t, flag)
+
+ num := unAckTracker.Size()
+ assert.Equal(t, num, 5)
+
+ for index, msgID := range msgIDs {
+ unAckTracker.Remove(msgID)
+ assert.Equal(t, 4-index, unAckTracker.Size())
+ }
+
+ num = unAckTracker.Size()
+ assert.Equal(t, num, 0)
+
+ flag = unAckTracker.IsEmpty()
+ assert.True(t, flag)
+}
diff --git a/pulsar/unackedMsgTracker.go b/pulsar/unackedMsgTracker.go
new file mode 100644
index 0000000..8ec51c6
--- /dev/null
+++ b/pulsar/unackedMsgTracker.go
@@ -0,0 +1,213 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+ "sync"
+ "time"
+
+ "github.com/apache/pulsar-client-go/pkg/pb"
+ "github.com/golang/protobuf/proto"
+
+ set "github.com/deckarep/golang-set"
+ log "github.com/sirupsen/logrus"
+)
+
+type UnackedMessageTracker struct {
+ cmu sync.RWMutex // protects following
+ currentSet set.Set
+ oldOpenSet set.Set
+ timeout *time.Ticker
+
+ pc *partitionConsumer
+ pcs []*partitionConsumer
+}
+
+// NewUnackedMessageTracker init UnackedMessageTracker object
+func NewUnackedMessageTracker() *UnackedMessageTracker {
+ unAckTracker := &UnackedMessageTracker{
+ currentSet: set.NewSet(),
+ oldOpenSet: set.NewSet(),
+ }
+
+ return unAckTracker
+}
+
+// Size return the size of current set and old open set cardinality
+func (t *UnackedMessageTracker) Size() int {
+ t.cmu.Lock()
+ defer t.cmu.Unlock()
+
+ return t.currentSet.Cardinality() + t.oldOpenSet.Cardinality()
+}
+
+// IsEmpty check if the currentSet or oldOpenSet are empty.
+func (t *UnackedMessageTracker) IsEmpty() bool {
+ t.cmu.RLock()
+ defer t.cmu.RUnlock()
+
+ return t.currentSet.Cardinality() == 0 && t.oldOpenSet.Cardinality() == 0
+}
+
+// Add will add message id data to currentSet and remove the message id from oldOpenSet.
+func (t *UnackedMessageTracker) Add(id *pb.MessageIdData) bool {
+ t.cmu.Lock()
+ defer t.cmu.Unlock()
+
+ t.oldOpenSet.Remove(id)
+ return t.currentSet.Add(id)
+}
+
+// Remove will remove message id data from currentSet and oldOpenSet
+func (t *UnackedMessageTracker) Remove(id *pb.MessageIdData) {
+ t.cmu.Lock()
+ defer t.cmu.Unlock()
+
+ t.currentSet.Remove(id)
+ t.oldOpenSet.Remove(id)
+}
+
+func (t *UnackedMessageTracker) clear() {
+ t.cmu.Lock()
+ defer t.cmu.Unlock()
+
+ t.currentSet.Clear()
+ t.oldOpenSet.Clear()
+}
+
+func (t *UnackedMessageTracker) toggle() {
+ t.cmu.Lock()
+ defer t.cmu.Unlock()
+
+ t.currentSet, t.oldOpenSet = t.oldOpenSet, t.currentSet
+}
+
+func (t *UnackedMessageTracker) isAckTimeout() bool {
+ t.cmu.RLock()
+ defer t.cmu.RUnlock()
+
+ return !(t.oldOpenSet.Cardinality() == 0)
+}
+
+func (t *UnackedMessageTracker) lessThanOrEqual(id1, id2 pb.MessageIdData) bool {
+ return id1.GetPartition() == id2.GetPartition() &&
+ (id1.GetLedgerId() < id2.GetLedgerId() || id1.GetEntryId() <= id2.GetEntryId())
+}
+
+func (t *UnackedMessageTracker) RemoveMessagesTill(id pb.MessageIdData) int {
+ t.cmu.Lock()
+ defer t.cmu.Unlock()
+
+ counter := 0
+
+ t.currentSet.Each(func(elem interface{}) bool {
+ if t.lessThanOrEqual(elem.(pb.MessageIdData), id) {
+ t.currentSet.Remove(elem)
+ counter++
+ }
+ return true
+ })
+
+ t.oldOpenSet.Each(func(elem interface{}) bool {
+ if t.lessThanOrEqual(elem.(pb.MessageIdData), id) {
+ t.currentSet.Remove(elem)
+ counter++
+ }
+ return true
+ })
+
+ return counter
+}
+
+func (t *UnackedMessageTracker) Start(ackTimeoutMillis int64) {
+ t.cmu.Lock()
+ defer t.cmu.Unlock()
+ t.timeout = time.NewTicker((time.Duration(ackTimeoutMillis)) * time.Millisecond)
+
+ go t.handlerCmd(ackTimeoutMillis)
+}
+
+func (t *UnackedMessageTracker) handlerCmd(ackTimeoutMillis int64) {
+ for {
+ select {
+ case tick := <-t.timeout.C:
+ if t.isAckTimeout() {
+ log.Debugf(" %d messages have timed-out", t.oldOpenSet.Cardinality())
+ messageIds := make([]*pb.MessageIdData, 0)
+
+ t.oldOpenSet.Each(func(i interface{}) bool {
+ messageIds = append(messageIds, i.(*pb.MessageIdData))
+ return false
+ })
+
+ log.Debugf("messageID length is:%d", len(messageIds))
+
+ t.oldOpenSet.Clear()
+
+ if t.pc != nil {
+ requestID := t.pc.client.rpcClient.NewRequestID()
+ cmd := &pb.CommandRedeliverUnacknowledgedMessages{
+ ConsumerId: proto.Uint64(t.pc.consumerID),
+ MessageIds: messageIds,
+ }
+
+ _, err := t.pc.client.rpcClient.RequestOnCnx(t.pc.cnx, requestID,
+ pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, cmd)
+ if err != nil {
+ t.pc.log.WithError(err).Error("Failed to unsubscribe consumer")
+ return
+ }
+
+ log.Debugf("consumer:%v redeliver messages num:%d", t.pc.consumerName, len(messageIds))
+ } else if t.pcs != nil {
+ messageIdsMap := make(map[int32][]*pb.MessageIdData)
+ for _, msgID := range messageIds {
+ messageIdsMap[msgID.GetPartition()] = append(messageIdsMap[msgID.GetPartition()], msgID)
+ }
+
+ for index, subConsumer := range t.pcs {
+ if messageIdsMap[int32(index)] != nil {
+ requestID := subConsumer.client.rpcClient.NewRequestID()
+ cmd := &pb.CommandRedeliverUnacknowledgedMessages{
+ ConsumerId: proto.Uint64(subConsumer.consumerID),
+ MessageIds: messageIdsMap[int32(index)],
+ }
+
+ _, err := subConsumer.client.rpcClient.RequestOnCnx(subConsumer.cnx, requestID,
+ pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, cmd)
+ if err != nil {
+ subConsumer.log.WithError(err).Error("Failed to unsubscribe consumer")
+ return
+ }
+ }
+ }
+ }
+ }
+ log.Debug("Tick at ", tick)
+ }
+
+ t.toggle()
+ }
+}
+
+func (t *UnackedMessageTracker) Stop() {
+ t.timeout.Stop()
+ log.Debug("stop ticker ", t.timeout)
+
+ t.clear()
+}
diff --git a/util/error.go b/util/error.go
new file mode 100644
index 0000000..6a051f5
--- /dev/null
+++ b/util/error.go
@@ -0,0 +1,50 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package util
+
+import (
+ `fmt`
+ `github.com/apache/pulsar-client-go/pkg/pb`
+)
+
+// NewUnexpectedErrMsg instantiates an ErrUnexpectedMsg error.
+// Optionally provide a list of IDs associated with the message
+// for additional context in the error message.
+func NewUnexpectedErrMsg(msgType pb.BaseCommand_Type, ids ...interface{}) *UnexpectedErrMsg {
+ return &UnexpectedErrMsg{
+ msgType: msgType,
+ ids: ids,
+ }
+}
+
+// UnexpectedErrMsg is returned when an unexpected message is received.
+type UnexpectedErrMsg struct {
+ msgType pb.BaseCommand_Type
+ ids []interface{}
+}
+
+// Error satisfies the error interface.
+func (e *UnexpectedErrMsg) Error() string {
+ msg := fmt.Sprintf("received unexpected message of type %q", e.msgType.String())
+ for _, id := range e.ids {
+ msg += fmt.Sprintf(" id=%v", id)
+ }
+ return msg
+}
diff --git a/util/util.go b/util/util.go
new file mode 100644
index 0000000..06a7e53
--- /dev/null
+++ b/util/util.go
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package util
+
+import (
+ `reflect`
+)
+
+// IsNil check if the interface is nil
+func IsNil(i interface{}) bool {
+ vi := reflect.ValueOf(i)
+ if vi.Kind() == reflect.Ptr {
+ return vi.IsNil()
+ }
+ return false
+}
+
diff --git a/util/util_test.go b/util/util_test.go
new file mode 100644
index 0000000..2e1195c
--- /dev/null
+++ b/util/util_test.go
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package util
+
+import (
+ `github.com/stretchr/testify/assert`
+ `testing`
+)
+
+func TestIsNil(t *testing.T) {
+ var a interface{} = nil
+ var b interface{} = (*int)(nil)
+
+ assert.True(t, a == nil)
+ assert.False(t, b == nil)
+}