fix: improve zero queue consumer support for partitioned topics (#1424)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 20227be..05db072 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -23,12 +23,9 @@
 	"fmt"
 	"math/rand"
 	"strconv"
-	"strings"
 	"sync"
 	"time"
 
-	"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
-
 	"github.com/apache/pulsar-client-go/pulsar/crypto"
 	"github.com/apache/pulsar-client-go/pulsar/internal"
 	pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
@@ -269,11 +266,6 @@
 		return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
 	}
 
-	if len(partitions) == 1 && options.EnableZeroQueueConsumer &&
-		strings.Contains(partitions[0], utils.PARTITIONEDTOPICSUFFIX) {
-		return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
-	}
-
 	if len(partitions) == 1 && options.EnableZeroQueueConsumer {
 		return newZeroConsumer(client, options, topic, messageCh, dlq, rlq, disableForceTopicCreation)
 	}
diff --git a/pulsar/consumer_zero_queue.go b/pulsar/consumer_zero_queue.go
index 3f2862d..5b85df8 100644
--- a/pulsar/consumer_zero_queue.go
+++ b/pulsar/consumer_zero_queue.go
@@ -66,7 +66,11 @@
 		consumerName:              options.Name,
 		metrics:                   client.metrics.GetLeveledMetrics(topic),
 	}
-	opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, 0, zc.options)
+	tn, err := internal.ParseTopicName(topic)
+	if err != nil {
+		return nil, err
+	}
+	opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, tn.Partition, zc.options)
 	conn, err := newPartitionConsumer(zc, zc.client, opts, zc.messageCh, zc.dlq, zc.metrics)
 	if err != nil {
 		return nil, err
@@ -142,11 +146,14 @@
 
 func (z *zeroQueueConsumer) checkMsgIDPartition(msgID MessageID) error {
 	partition := msgID.PartitionIdx()
-	if partition != 0 {
-		z.log.Errorf("invalid partition index %d expected a partition equal to 0",
-			partition)
-		return fmt.Errorf("invalid partition index %d expected a partition equal to 0",
-			partition)
+	if partition == 0 || partition == -1 {
+		return nil
+	}
+	if partition != z.pc.partitionIdx {
+		z.log.Errorf("invalid partition index %d expected a partition equal to %d",
+			partition, z.pc.partitionIdx)
+		return fmt.Errorf("invalid partition index %d expected a partition equal to %d",
+			partition, z.pc.partitionIdx)
 	}
 	return nil
 }
diff --git a/pulsar/consumer_zero_queue_test.go b/pulsar/consumer_zero_queue_test.go
index 06db433..72048d7 100644
--- a/pulsar/consumer_zero_queue_test.go
+++ b/pulsar/consumer_zero_queue_test.go
@@ -115,7 +115,8 @@
 		assert.Equal(t, "pulsar", msg.Key())
 		assert.Equal(t, expectProperties, msg.Properties())
 		// ack message
-		consumer.Ack(msg)
+		err = consumer.Ack(msg)
+		assert.Nil(t, err)
 		log.Printf("receive message: %s", msg.ID().String())
 	}
 	err = consumer.Unsubscribe()
@@ -228,7 +229,8 @@
 		assert.Equal(t, "pulsar", msg.Key())
 		assert.Equal(t, expectProperties, msg.Properties())
 		// ack message
-		consumer.Ack(msg)
+		err = consumer.Ack(msg)
+		assert.Nil(t, err)
 		log.Printf("receive message: %s", msg.ID().String())
 	}
 	err = consumer.Unsubscribe()
@@ -341,7 +343,7 @@
 	assert.Nil(t, consumer)
 	assert.Error(t, err, "ZeroQueueConsumer is not supported for partitioned topics")
 }
-func TestOnePartitionZeroQueueConsumer(t *testing.T) {
+func TestSpecifiedPartitionZeroQueueConsumer(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: lookupURL,
 	})
@@ -350,17 +352,65 @@
 	defer client.Close()
 
 	topic := newTopicName()
-	err = createPartitionedTopic(topic, 1)
+	ctx := context.Background()
+	err = createPartitionedTopic(topic, 2)
+	assert.Nil(t, err)
+	topics, err := client.TopicPartitions(topic)
 	assert.Nil(t, err)
 
 	// create consumer
 	consumer, err := client.Subscribe(ConsumerOptions{
-		Topic:                   topic,
+		Topic:                   topics[1],
 		SubscriptionName:        "my-sub",
 		EnableZeroQueueConsumer: true,
 	})
-	assert.Nil(t, consumer)
-	assert.Error(t, err, "ZeroQueueConsumer is not supported for partitioned topics")
+	assert.Nil(t, err)
+	_, ok := consumer.(*zeroQueueConsumer)
+	assert.True(t, ok)
+	defer consumer.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           topics[1],
+		DisableBatching: false,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	// send 10 messages
+	for i := 0; i < 10; i++ {
+		msg, err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+			Key:     "pulsar",
+			Properties: map[string]string{
+				"key-1": "pulsar-1",
+			},
+		})
+		assert.Nil(t, err)
+		log.Printf("send message: %s", msg.String())
+	}
+
+	// receive 10 messages
+	for i := 0; i < 10; i++ {
+		msg, err := consumer.Receive(context.Background())
+		if err != nil {
+			log.Fatal(err)
+		}
+
+		expectMsg := fmt.Sprintf("hello-%d", i)
+		expectProperties := map[string]string{
+			"key-1": "pulsar-1",
+		}
+		assert.Equal(t, []byte(expectMsg), msg.Payload())
+		assert.Equal(t, "pulsar", msg.Key())
+		assert.Equal(t, expectProperties, msg.Properties())
+		// ack message
+		err = consumer.Ack(msg)
+		assert.Nil(t, err)
+		log.Printf("receive message: %s", msg.ID().String())
+	}
+	err = consumer.Unsubscribe()
+	assert.Nil(t, err)
 }
 
 func TestZeroQueueConsumerGetLastMessageIDs(t *testing.T) {
@@ -576,7 +626,8 @@
 
 		if i%2 == 0 {
 			// Only acks even messages
-			consumer.Ack(msg)
+			err = consumer.Ack(msg)
+			assert.Nil(t, err)
 		} else {
 			// Fails to process odd messages
 			consumer.Nack(msg)
@@ -591,7 +642,8 @@
 		assert.Nil(t, err)
 		assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
 
-		consumer.Ack(msg)
+		err = consumer.Ack(msg)
+		assert.Nil(t, err)
 	}
 }
 
@@ -641,7 +693,8 @@
 		msg, err := consumer.Receive(ctx)
 		assert.Nil(t, err)
 		assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
-		consumer.Ack(msg)
+		err = consumer.Ack(msg)
+		assert.Nil(t, err)
 	}
 
 	err = consumer.Seek(seekID)
@@ -698,7 +751,8 @@
 		msg, err := consumer.Receive(ctx)
 		assert.Nil(t, err)
 		assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
-		consumer.Ack(msg)
+		err = consumer.Ack(msg)
+		assert.Nil(t, err)
 	}
 
 	currentTimestamp := time.Now()
@@ -711,6 +765,7 @@
 		msg, err := consumer.Receive(ctx)
 		assert.Nil(t, err)
 		assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
-		consumer.Ack(msg)
+		err = consumer.Ack(msg)
+		assert.Nil(t, err)
 	}
 }