fix: improve zero queue consumer support for partitioned topics (#1424)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 20227be..05db072 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -23,12 +23,9 @@
"fmt"
"math/rand"
"strconv"
- "strings"
"sync"
"time"
- "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
-
"github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
@@ -269,11 +266,6 @@
return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
}
- if len(partitions) == 1 && options.EnableZeroQueueConsumer &&
- strings.Contains(partitions[0], utils.PARTITIONEDTOPICSUFFIX) {
- return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
- }
-
if len(partitions) == 1 && options.EnableZeroQueueConsumer {
return newZeroConsumer(client, options, topic, messageCh, dlq, rlq, disableForceTopicCreation)
}
diff --git a/pulsar/consumer_zero_queue.go b/pulsar/consumer_zero_queue.go
index 3f2862d..5b85df8 100644
--- a/pulsar/consumer_zero_queue.go
+++ b/pulsar/consumer_zero_queue.go
@@ -66,7 +66,11 @@
consumerName: options.Name,
metrics: client.metrics.GetLeveledMetrics(topic),
}
- opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, 0, zc.options)
+ tn, err := internal.ParseTopicName(topic)
+ if err != nil {
+ return nil, err
+ }
+ opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, tn.Partition, zc.options)
conn, err := newPartitionConsumer(zc, zc.client, opts, zc.messageCh, zc.dlq, zc.metrics)
if err != nil {
return nil, err
@@ -142,11 +146,14 @@
func (z *zeroQueueConsumer) checkMsgIDPartition(msgID MessageID) error {
partition := msgID.PartitionIdx()
- if partition != 0 {
- z.log.Errorf("invalid partition index %d expected a partition equal to 0",
- partition)
- return fmt.Errorf("invalid partition index %d expected a partition equal to 0",
- partition)
+ if partition == 0 || partition == -1 {
+ return nil
+ }
+ if partition != z.pc.partitionIdx {
+ z.log.Errorf("invalid partition index %d expected a partition equal to %d",
+ partition, z.pc.partitionIdx)
+ return fmt.Errorf("invalid partition index %d expected a partition equal to %d",
+ partition, z.pc.partitionIdx)
}
return nil
}
diff --git a/pulsar/consumer_zero_queue_test.go b/pulsar/consumer_zero_queue_test.go
index 06db433..72048d7 100644
--- a/pulsar/consumer_zero_queue_test.go
+++ b/pulsar/consumer_zero_queue_test.go
@@ -115,7 +115,8 @@
assert.Equal(t, "pulsar", msg.Key())
assert.Equal(t, expectProperties, msg.Properties())
// ack message
- consumer.Ack(msg)
+ err = consumer.Ack(msg)
+ assert.Nil(t, err)
log.Printf("receive message: %s", msg.ID().String())
}
err = consumer.Unsubscribe()
@@ -228,7 +229,8 @@
assert.Equal(t, "pulsar", msg.Key())
assert.Equal(t, expectProperties, msg.Properties())
// ack message
- consumer.Ack(msg)
+ err = consumer.Ack(msg)
+ assert.Nil(t, err)
log.Printf("receive message: %s", msg.ID().String())
}
err = consumer.Unsubscribe()
@@ -341,7 +343,7 @@
assert.Nil(t, consumer)
assert.Error(t, err, "ZeroQueueConsumer is not supported for partitioned topics")
}
-func TestOnePartitionZeroQueueConsumer(t *testing.T) {
+func TestSpecifiedPartitionZeroQueueConsumer(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
@@ -350,17 +352,65 @@
defer client.Close()
topic := newTopicName()
- err = createPartitionedTopic(topic, 1)
+ ctx := context.Background()
+ err = createPartitionedTopic(topic, 2)
+ assert.Nil(t, err)
+ topics, err := client.TopicPartitions(topic)
assert.Nil(t, err)
// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
- Topic: topic,
+ Topic: topics[1],
SubscriptionName: "my-sub",
EnableZeroQueueConsumer: true,
})
- assert.Nil(t, consumer)
- assert.Error(t, err, "ZeroQueueConsumer is not supported for partitioned topics")
+ assert.Nil(t, err)
+ _, ok := consumer.(*zeroQueueConsumer)
+ assert.True(t, ok)
+ defer consumer.Close()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topics[1],
+ DisableBatching: false,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // send 10 messages
+ for i := 0; i < 10; i++ {
+ msg, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ Key: "pulsar",
+ Properties: map[string]string{
+ "key-1": "pulsar-1",
+ },
+ })
+ assert.Nil(t, err)
+ log.Printf("send message: %s", msg.String())
+ }
+
+ // receive 10 messages
+ for i := 0; i < 10; i++ {
+ msg, err := consumer.Receive(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ expectMsg := fmt.Sprintf("hello-%d", i)
+ expectProperties := map[string]string{
+ "key-1": "pulsar-1",
+ }
+ assert.Equal(t, []byte(expectMsg), msg.Payload())
+ assert.Equal(t, "pulsar", msg.Key())
+ assert.Equal(t, expectProperties, msg.Properties())
+ // ack message
+ err = consumer.Ack(msg)
+ assert.Nil(t, err)
+ log.Printf("receive message: %s", msg.ID().String())
+ }
+ err = consumer.Unsubscribe()
+ assert.Nil(t, err)
}
func TestZeroQueueConsumerGetLastMessageIDs(t *testing.T) {
@@ -576,7 +626,8 @@
if i%2 == 0 {
// Only acks even messages
- consumer.Ack(msg)
+ err = consumer.Ack(msg)
+ assert.Nil(t, err)
} else {
// Fails to process odd messages
consumer.Nack(msg)
@@ -591,7 +642,8 @@
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
- consumer.Ack(msg)
+ err = consumer.Ack(msg)
+ assert.Nil(t, err)
}
}
@@ -641,7 +693,8 @@
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
- consumer.Ack(msg)
+ err = consumer.Ack(msg)
+ assert.Nil(t, err)
}
err = consumer.Seek(seekID)
@@ -698,7 +751,8 @@
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
- consumer.Ack(msg)
+ err = consumer.Ack(msg)
+ assert.Nil(t, err)
}
currentTimestamp := time.Now()
@@ -711,6 +765,7 @@
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
- consumer.Ack(msg)
+ err = consumer.Ack(msg)
+ assert.Nil(t, err)
}
}