blob: 68cbdd62f7e46e1da48f098111d20a2db960c50d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
"testing"
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/stretchr/testify/assert"
"github.com/apache/pulsar-client-go/pulsar/internal"
)
func TestSingleMessageIDNoAckTracker(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
compressionProviders: make(map[pb.CompressionType]compression.Provider),
options: &partitionConsumerOpts{},
}
headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage)
if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
t.Fatal(err)
}
// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
assert.Nil(t, m.ID().(trackingMessageID).tracker)
}
// ack the message id
pc.AckID(messages[0].msgID.(trackingMessageID))
select {
case <-eventsCh:
default:
t.Error("Expected an ack request to be triggered!")
}
}
func TestBatchMessageIDNoAckTracker(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
compressionProviders: make(map[pb.CompressionType]compression.Provider),
options: &partitionConsumerOpts{},
}
headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1)
if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
t.Fatal(err)
}
// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
assert.Nil(t, m.ID().(trackingMessageID).tracker)
}
// ack the message id
pc.AckID(messages[0].msgID.(trackingMessageID))
select {
case <-eventsCh:
default:
t.Error("Expected an ack request to be triggered!")
}
}
func TestBatchMessageIDWithAckTracker(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
compressionProviders: make(map[pb.CompressionType]compression.Provider),
options: &partitionConsumerOpts{},
}
headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
t.Fatal(err)
}
// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
assert.NotNil(t, m.ID().(trackingMessageID).tracker)
}
// ack all message ids except the last one
for i := 0; i < 9; i++ {
pc.AckID(messages[i].msgID.(trackingMessageID))
}
select {
case <-eventsCh:
t.Error("The message id should not be acked!")
default:
}
// ack last message
pc.AckID(messages[9].msgID.(trackingMessageID))
select {
case <-eventsCh:
default:
t.Error("Expected an ack request to be triggered!")
}
}
// Raw single message in old format
// metadata properties:<key:"a" value:"1" > properties:<key:"b" value:"2" >
// payload = "hello"
var rawCompatSingleMessage = []byte{
0x0e, 0x01, 0x08, 0x36, 0xb4, 0x66, 0x00, 0x00,
0x00, 0x31, 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e,
0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, 0x2d, 0x37,
0x34, 0x2d, 0x30, 0x10, 0x00, 0x18, 0xac, 0xef,
0xe8, 0xa0, 0xe2, 0x2d, 0x22, 0x06, 0x0a, 0x01,
0x61, 0x12, 0x01, 0x31, 0x22, 0x06, 0x0a, 0x01,
0x62, 0x12, 0x01, 0x32, 0x48, 0x05, 0x60, 0x05,
0x82, 0x01, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f,
}
// Message with batch of 1
// singe message metadata properties:<key:"a" value:"1" > properties:<key:"b" value:"2" >
// payload = "hello"
var rawBatchMessage1 = []byte{
0x0e, 0x01, 0x1f, 0x80, 0x09, 0x68, 0x00, 0x00,
0x00, 0x1f, 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e,
0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, 0x2d, 0x37,
0x34, 0x2d, 0x31, 0x10, 0x00, 0x18, 0xdb, 0x80,
0xf4, 0xa0, 0xe2, 0x2d, 0x58, 0x01, 0x82, 0x01,
0x00, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a,
0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a,
0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05, 0x28,
0x05, 0x40, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f,
}
var rawBatchMessage10 = []byte{
0x0e, 0x01, 0x7b, 0x28, 0x8c, 0x08,
0x00, 0x00, 0x00, 0x1f, 0x0a, 0x0f, 0x73, 0x74,
0x61, 0x6e, 0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65,
0x2d, 0x37, 0x34, 0x2d, 0x32, 0x10, 0x00, 0x18,
0xd0, 0xc2, 0xfa, 0xa0, 0xe2, 0x2d, 0x58, 0x0a,
0x82, 0x01, 0x00, 0x00, 0x00, 0x00, 0x16, 0x0a,
0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a,
0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18,
0x05, 0x28, 0x05, 0x40, 0x00, 0x68, 0x65, 0x6c,
0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06,
0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06,
0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05,
0x28, 0x05, 0x40, 0x01, 0x68, 0x65, 0x6c, 0x6c,
0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a,
0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a,
0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05, 0x28,
0x05, 0x40, 0x02, 0x68, 0x65, 0x6c, 0x6c, 0x6f,
0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a, 0x01,
0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a, 0x01,
0x62, 0x12, 0x01, 0x32, 0x18, 0x05, 0x28, 0x05,
0x40, 0x03, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00,
0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a, 0x01, 0x61,
0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a, 0x01, 0x62,
0x12, 0x01, 0x32, 0x18, 0x05, 0x28, 0x05, 0x40,
0x04, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00,
0x00, 0x16, 0x0a, 0x06, 0x0a, 0x01, 0x61, 0x12,
0x01, 0x31, 0x0a, 0x06, 0x0a, 0x01, 0x62, 0x12,
0x01, 0x32, 0x18, 0x05, 0x28, 0x05, 0x40, 0x05,
0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00,
0x16, 0x0a, 0x06, 0x0a, 0x01, 0x61, 0x12, 0x01,
0x31, 0x0a, 0x06, 0x0a, 0x01, 0x62, 0x12, 0x01,
0x32, 0x18, 0x05, 0x28, 0x05, 0x40, 0x06, 0x68,
0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16,
0x0a, 0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31,
0x0a, 0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32,
0x18, 0x05, 0x28, 0x05, 0x40, 0x07, 0x68, 0x65,
0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a,
0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a,
0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18,
0x05, 0x28, 0x05, 0x40, 0x08, 0x68, 0x65, 0x6c,
0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06,
0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06,
0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05,
0x28, 0x05, 0x40, 0x09, 0x68, 0x65, 0x6c, 0x6c,
0x6f,
}