blob: 46e6f226f162e2ca858e0c98cb1cdf6721c61316 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsaradmin"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
)
func TestMultiTopicConsumerReceive(t *testing.T) {
topic1 := newTopicName()
topic2 := newTopicName()
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
t.Fatal(err)
}
topics := []string{topic1, topic2}
consumer, err := client.Subscribe(ConsumerOptions{
Topics: topics,
SubscriptionName: "multi-topic-sub",
})
if err != nil {
t.Fatal(err)
}
defer consumer.Close()
// produce messages
for i, topic := range topics {
p, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
})
if err != nil {
t.Fatal(err)
}
err = genMessages(p, 10, func(idx int) string {
return fmt.Sprintf("topic-%d-hello-%d", i+1, idx)
})
p.Close()
if err != nil {
t.Fatal(err)
}
}
receivedTopic1 := 0
receivedTopic2 := 0
// nolint
for receivedTopic1+receivedTopic2 < 20 {
select {
case cm, ok := <-consumer.Chan():
if ok {
msg := string(cm.Payload())
if strings.HasPrefix(msg, "topic-1") {
receivedTopic1++
} else if strings.HasPrefix(msg, "topic-2") {
receivedTopic2++
}
consumer.Ack(cm.Message)
} else {
t.Fail()
}
}
}
assert.Equal(t, receivedTopic1, receivedTopic2)
}
func TestMultiTopicConsumerUnsubscribe(t *testing.T) {
topic1 := newTopicName()
topic2 := newTopicName()
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
t.Fatal(err)
}
topics := []string{topic1, topic2}
consumer, err := client.Subscribe(ConsumerOptions{
Topics: topics,
SubscriptionName: "multi-topic-sub",
})
if err != nil {
t.Fatal(err)
}
defer consumer.Close()
err = consumer.Unsubscribe()
assert.Nil(t, err)
err = consumer.Unsubscribe()
assert.Error(t, err)
}
func TestMultiTopicConsumerForceUnsubscribe(t *testing.T) {
topic1 := newTopicName()
topic2 := newTopicName()
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
t.Fatal(err)
}
topics := []string{topic1, topic2}
consumer, err := client.Subscribe(ConsumerOptions{
Topics: topics,
SubscriptionName: "multi-topic-sub",
})
if err != nil {
t.Fatal(err)
}
defer consumer.Close()
err = consumer.UnsubscribeForce()
assert.Nil(t, err)
err = consumer.UnsubscribeForce()
assert.Error(t, err)
}
func TestMultiTopicGetLastMessageIDs(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic1Partition, topic2Partition, topic3Partition := 1, 2, 3
topic1 := newTopicName()
err = createPartitionedTopic(topic1, topic1Partition)
assert.Nil(t, err)
topic2 := newTopicName()
err = createPartitionedTopic(topic2, topic2Partition)
assert.Nil(t, err)
topic3 := newTopicName()
err = createPartitionedTopic(topic3, topic3Partition)
assert.Nil(t, err)
topics := []string{topic1, topic2, topic3}
// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topics: topics,
SubscriptionName: "my-sub",
Type: Shared,
})
assert.Nil(t, err)
defer consumer.Close()
// produce messages
totalMessage := 30
for i, topic := range topics {
p, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
})
if err != nil {
t.Fatal(err)
}
err = genMessages(p, totalMessage, func(idx int) string {
return fmt.Sprintf("topic-%d-hello-%d", i+1, idx)
})
p.Close()
if err != nil {
assert.Nil(t, err)
}
}
// create admin
admin, err := pulsaradmin.NewClient(&config.Config{})
assert.Nil(t, err)
topicMessageIDs, err := consumer.GetLastMessageIDs()
assert.Nil(t, err)
assert.Equal(t, topic1Partition+topic2Partition+topic3Partition, len(topicMessageIDs))
for _, id := range topicMessageIDs {
if strings.Contains(id.Topic(), topic1) {
assert.Equal(t, int(id.EntryID()), totalMessage/topic1Partition-1)
} else if strings.Contains(id.Topic(), topic2) {
assert.Equal(t, int(id.EntryID()), totalMessage/topic2Partition-1)
} else if strings.Contains(id.Topic(), topic3) {
assert.Equal(t, int(id.EntryID()), totalMessage/topic3Partition-1)
}
topicName, err := utils.GetTopicName(id.Topic())
assert.Nil(t, err)
messages, err := admin.Subscriptions().GetMessagesByID(*topicName, id.LedgerID(), id.EntryID())
assert.Nil(t, err)
assert.Equal(t, 1, len(messages))
}
}
func TestMultiTopicAckIDList(t *testing.T) {
for _, params := range []bool{true, false} {
t.Run(fmt.Sprintf("TestMultiTopicConsumerAckIDList%v", params), func(t *testing.T) {
runMultiTopicAckIDList(t, params)
})
}
}
func runMultiTopicAckIDList(t *testing.T, regex bool) {
topicPrefix := fmt.Sprintf("multiTopicAckIDList%v", time.Now().UnixNano())
topic1 := "persistent://public/default/" + topicPrefix + "1"
topic2 := "persistent://public/default/" + topicPrefix + "2"
client, err := NewClient(ClientOptions{URL: "pulsar://localhost:6650"})
assert.Nil(t, err)
defer client.Close()
if regex {
admin, err := pulsaradmin.NewClient(&config.Config{})
assert.Nil(t, err)
for _, topic := range []string{topic1, topic2} {
topicName, err := utils.GetTopicName(topic)
assert.Nil(t, err)
admin.Topics().Create(*topicName, 0)
}
}
createConsumer := func() Consumer {
options := ConsumerOptions{
SubscriptionName: "sub",
Type: Shared,
AckWithResponse: true,
}
if regex {
options.TopicsPattern = topicPrefix + ".*"
} else {
options.Topics = []string{topic1, topic2}
}
consumer, err := client.Subscribe(options)
assert.Nil(t, err)
return consumer
}
consumer := createConsumer()
sendMessages(t, client, topic1, 0, 3, false)
sendMessages(t, client, topic2, 0, 2, false)
receiveMessageMap := func(consumer Consumer, numMessages int) map[string][]Message {
msgs := receiveMessages(t, consumer, numMessages)
topicToMsgs := make(map[string][]Message)
for _, msg := range msgs {
topicToMsgs[msg.Topic()] = append(topicToMsgs[msg.Topic()], msg)
}
return topicToMsgs
}
topicToMsgs := receiveMessageMap(consumer, 5)
assert.Equal(t, 3, len(topicToMsgs[topic1]))
for i := 0; i < 3; i++ {
assert.Equal(t, fmt.Sprintf("msg-%d", i), string(topicToMsgs[topic1][i].Payload()))
}
assert.Equal(t, 2, len(topicToMsgs[topic2]))
for i := 0; i < 2; i++ {
assert.Equal(t, fmt.Sprintf("msg-%d", i), string(topicToMsgs[topic2][i].Payload()))
}
assert.Nil(t, consumer.AckIDList([]MessageID{
topicToMsgs[topic1][0].ID(),
topicToMsgs[topic1][2].ID(),
topicToMsgs[topic2][1].ID(),
}))
consumer.Close()
consumer = createConsumer()
topicToMsgs = receiveMessageMap(consumer, 2)
assert.Equal(t, 1, len(topicToMsgs[topic1]))
assert.Equal(t, "msg-1", string(topicToMsgs[topic1][0].Payload()))
assert.Equal(t, 1, len(topicToMsgs[topic2]))
assert.Equal(t, "msg-0", string(topicToMsgs[topic2][0].Payload()))
consumer.Close()
msgID0 := topicToMsgs[topic1][0].ID()
err = consumer.AckIDList([]MessageID{msgID0})
assert.NotNil(t, err)
t.Logf("AckIDList error: %v", err)
msgID1 := topicToMsgs[topic2][0].ID()
if ackError, ok := consumer.AckIDList([]MessageID{msgID0, msgID1}).(AckError); ok {
assert.Equal(t, 2, len(ackError))
assert.Contains(t, ackError, msgID0)
assert.Equal(t, "consumer state is closed", ackError[msgID0].Error())
assert.Contains(t, ackError, msgID1)
assert.Equal(t, "consumer state is closed", ackError[msgID1].Error())
} else {
assert.Fail(t, "AckIDList should return AckError")
}
}
type dummyConnection struct {
}
func (dummyConnection) SendRequest(_ uint64, _ *pb.BaseCommand, _ func(*pb.BaseCommand, error)) {
}
func (dummyConnection) SendRequestNoWait(_ *pb.BaseCommand) error {
return nil
}
func (dummyConnection) WriteData(_ context.Context, _ internal.Buffer) {
}
func (dummyConnection) RegisterListener(_ uint64, _ internal.ConnectionListener) error {
return nil
}
func (dummyConnection) UnregisterListener(_ uint64) {
}
func (dummyConnection) AddConsumeHandler(_ uint64, _ internal.ConsumerHandler) error {
return nil
}
func (dummyConnection) DeleteConsumeHandler(_ uint64) {
}
func (dummyConnection) ID() string {
return "cnx"
}
func (dummyConnection) GetMaxMessageSize() int32 {
return 5 * 1024 * 1024 // 5MB
}
func (dummyConnection) Close() {
}
func (dummyConnection) WaitForClose() <-chan struct{} {
return nil
}
func (dummyConnection) IsProxied() bool {
return false
}
func TestMultiTopicAckIDListTimeout(t *testing.T) {
topic := fmt.Sprintf("multiTopicAckIDListTimeout%v", time.Now().UnixNano())
assert.NoError(t, createPartitionedTopic(topic, 5))
cli, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
OperationTimeout: 3 * time.Second,
})
assert.Nil(t, err)
defer cli.Close()
consumer, err := cli.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "sub",
AckWithResponse: true,
})
assert.Nil(t, err)
defer consumer.Close()
const numMessages = 5
sendMessages(t, cli, topic, 0, numMessages, false)
msgs := receiveMessages(t, consumer, numMessages)
msgIDs := make([]MessageID, len(msgs))
var conn internal.Connection
for i := 0; i < len(msgs); i++ {
msgIDs[i] = msgs[i].ID()
pc, ok := msgIDs[i].(*trackingMessageID).consumer.(*partitionConsumer)
assert.True(t, ok)
conn = pc._getConn()
pc._setConn(dummyConnection{})
}
start := time.Now()
err = consumer.AckIDList(msgIDs)
elapsed := time.Since(start)
t.Logf("AckIDList takes %v ms", elapsed)
assert.True(t, elapsed < 5*time.Second && elapsed >= 3*time.Second)
var ackError AckError
if errors.As(err, &ackError) {
for _, err := range ackError {
assert.Equal(t, "request timed out", err.Error())
}
}
for i := 0; i < len(msgs); i++ {
pc, ok := msgIDs[i].(*trackingMessageID).consumer.(*partitionConsumer)
assert.True(t, ok)
pc._setConn(conn)
}
}