blob: 97b4264136b6d52d0cf6394afb8f2439e78094eb [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
"context"
"fmt"
"log"
"net/http"
"strings"
"testing"
"time"
"github.com/apache/pulsar-client-go/util"
"github.com/stretchr/testify/assert"
)
var (
adminURL = "http://localhost:8080"
lookupURL = "pulsar://localhost:6650"
)
func TestProducerConsumer(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := "my-topic"
ctx := context.Background()
// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: Exclusive,
})
assert.Nil(t, err)
defer consumer.Close()
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)
defer producer.Close()
// send 10 messages
for i := 0; i < 10; i++ {
if err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Key: "pulsar",
Properties: map[string]string{
"key-1": "pulsar-1",
},
}); err != nil {
log.Fatal(err)
}
}
// receive 10 messages
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
expectMsg := fmt.Sprintf("hello-%d", i)
expectProperties := map[string]string{
"key-1": "pulsar-1",
}
assert.Equal(t, []byte(expectMsg), msg.Payload())
assert.Equal(t, "pulsar", msg.Key())
assert.Equal(t, expectProperties, msg.Properties())
// ack message
if err := consumer.Ack(msg); err != nil {
log.Fatal(err)
}
}
// unsubscribe consumer
if err := consumer.Unsubscribe(); err != nil {
log.Fatal(err)
}
}
func TestConsumerConnectError(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://invalid-hostname:6650",
})
assert.Nil(t, err)
defer client.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-subscription",
})
// Expect error in creating consumer
assert.Nil(t, consumer)
assert.NotNil(t, err)
assert.Equal(t, err.Error(), "connection error")
}
func TestBatchMessageReceive(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topicName := "persistent://public/default/receive-batch"
subName := "subscription-name"
prefix := "msg-batch-"
ctx := context.Background()
// Enable batching on producer side
batchSize, numOfMessages := 2, 100
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
BatchingMaxMessages: uint(batchSize),
DisableBatching: false,
BlockIfQueueFull: true,
})
assert.Nil(t, err)
assert.Equal(t, topicName, producer.Topic())
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: subName,
})
assert.Nil(t, err)
assert.Equal(t, topicName, consumer.Topic())
count := 0
for i := 0; i < numOfMessages; i++ {
messageContent := prefix + fmt.Sprintf("%d", i)
msg := &ProducerMessage{
Payload: []byte(messageContent),
}
err := producer.Send(ctx, msg)
assert.Nil(t, err)
}
for i := 0; i < numOfMessages; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
err = consumer.Ack(msg)
assert.Nil(t, err)
count++
}
// check strategically
for i := 0; i < 3; i++ {
if count == numOfMessages {
break
}
time.Sleep(time.Second)
}
assert.Equal(t, count, numOfMessages)
}
func TestConsumerWithInvalidConf(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
if err != nil {
t.Fatal(err)
return
}
defer client.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: "my-topic",
})
// Expect error in creating cosnumer
assert.Nil(t, consumer)
assert.NotNil(t, err)
fmt.Println(err.Error())
assert.Equal(t, err.(*Error).Result(), SubscriptionNotFound)
consumer, err = client.Subscribe(ConsumerOptions{
SubscriptionName: "my-subscription",
})
// Expect error in creating consumer
assert.Nil(t, consumer)
assert.NotNil(t, err)
assert.Equal(t, err.(*Error).Result(), TopicNotFound)
}
func TestConsumer_SubscriptionEarliestPos(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topicName := fmt.Sprintf("testSeek-%d", time.Now().Unix())
subName := "test-subscription-initial-earliest-position"
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.Nil(t, err)
defer producer.Close()
//sent message
ctx := context.Background()
err = producer.Send(ctx, &ProducerMessage{
Payload: []byte("msg-1-content-1"),
})
assert.Nil(t, err)
err = producer.Send(ctx, &ProducerMessage{
Payload: []byte("msg-1-content-2"),
})
assert.Nil(t, err)
// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: subName,
SubscriptionInitPos: Earliest,
})
assert.Nil(t, err)
defer consumer.Close()
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, "msg-1-content-1", string(msg.Payload()))
}
func makeHTTPCall(t *testing.T, method string, urls string, body string) {
client := http.Client{}
req, err := http.NewRequest(method, urls, strings.NewReader(body))
if err != nil {
t.Fatal(err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
res, err := client.Do(req)
if err != nil {
t.Fatal(err)
}
defer res.Body.Close()
}
func TestConsumerKeyShared(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := "persistent://public/default/test-topic-6"
consumer1, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "sub-1",
Type: KeyShared,
})
assert.Nil(t, err)
defer consumer1.Close()
consumer2, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "sub-1",
Type: KeyShared,
})
assert.Nil(t, err)
defer consumer2.Close()
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})
assert.Nil(t, err)
defer producer.Close()
ctx := context.Background()
for i := 0; i < 10; i++ {
err := producer.Send(ctx, &ProducerMessage{
Key: fmt.Sprintf("key-shared-%d", i%3),
Payload: []byte(fmt.Sprintf("value-%d", i)),
})
assert.Nil(t, err)
}
time.Sleep(time.Second * 1)
go func() {
for i := 0; i < 10; i++ {
msg, err := consumer1.Receive(ctx)
assert.Nil(t, err)
if msg != nil {
fmt.Printf("consumer1 key is: %s, value is: %s\n", msg.Key(), string(msg.Payload()))
err = consumer1.Ack(msg)
assert.Nil(t, err)
}
}
}()
go func() {
for i := 0; i < 10; i++ {
msg2, err := consumer2.Receive(ctx)
assert.Nil(t, err)
if msg2 != nil {
fmt.Printf("consumer2 key is:%s, value is: %s\n", msg2.Key(), string(msg2.Payload()))
err = consumer2.Ack(msg2)
assert.Nil(t, err)
}
}
}()
time.Sleep(time.Second * 1)
}
func TestPartitionTopicsConsumerPubSub(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := "persistent://public/default/testGetPartitions"
testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions/partitions"
makeHTTPCall(t, http.MethodPut, testURL, "64")
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})
assert.Nil(t, err)
defer producer.Close()
topics, err := client.TopicPartitions(topic)
assert.Nil(t, err)
assert.Equal(t, topic+"-partition-0", topics[0])
assert.Equal(t, topic+"-partition-1", topics[1])
assert.Equal(t, topic+"-partition-2", topics[2])
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: Exclusive,
ReceiverQueueSize: 10,
})
assert.Nil(t, err)
defer consumer.Close()
ctx := context.Background()
for i := 0; i < 10; i++ {
err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.Nil(t, err)
}
msgs := make([]string, 0)
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
msgs = append(msgs, string(msg.Payload()))
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
if err := consumer.Ack(msg); err != nil {
assert.Nil(t, err)
}
}
assert.Equal(t, len(msgs), 10)
}
func TestConsumer_ReceiveAsync(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topicName := "persistent://public/default/receive-async"
subName := "subscription-receive-async"
ctx := context.Background()
ch := make(chan ConsumerMessage, 10)
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.Nil(t, err)
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: subName,
})
assert.Nil(t, err)
defer consumer.Close()
//send 10 messages
for i := 0; i < 10; i++ {
err = producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.Nil(t, err)
}
//receive async 10 messages
err = consumer.ReceiveAsync(ctx, ch)
assert.Nil(t, err)
payloadList := make([]string, 0, 10)
RECEIVE:
for {
select {
case cMsg, ok := <-ch:
if ok {
fmt.Printf("receive message payload is:%s\n", string(cMsg.Payload()))
assert.Equal(t, topicName, cMsg.Message.Topic())
assert.Equal(t, topicName, cMsg.Consumer.Topic())
payloadList = append(payloadList, string(cMsg.Message.Payload()))
if len(payloadList) == 10 {
break RECEIVE
}
}
continue RECEIVE
case <-ctx.Done():
t.Error("context error.")
return
}
}
}
func TestConsumerAckTimeout(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := "test-ack-timeout-topic-1"
ctx := context.Background()
// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub1",
Type: Shared,
AckTimeout: 5 * 1000,
})
assert.Nil(t, err)
defer consumer.Close()
// create consumer1
consumer1, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub2",
Type: Shared,
AckTimeout: 5 * 1000,
})
assert.Nil(t, err)
defer consumer1.Close()
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.Nil(t, err)
defer producer.Close()
// send 10 messages
for i := 0; i < 10; i++ {
if err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
log.Fatal(err)
}
}
// consumer receive 10 messages
payloadList := make([]string, 0, 10)
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
payloadList = append(payloadList, string(msg.Payload()))
// not ack message
}
assert.Equal(t, 10, len(payloadList))
// consumer1 receive 10 messages
for i := 0; i < 10; i++ {
msg, err := consumer1.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
payloadList = append(payloadList, string(msg.Payload()))
// ack half of the messages
if i%2 == 0 {
err = consumer1.Ack(msg)
assert.Nil(t, err)
}
}
// wait ack timeout
time.Sleep(6 * time.Second)
fmt.Println("start redeliver messages...")
payloadList = make([]string, 0, 10)
// consumer receive messages again
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
payloadList = append(payloadList, string(msg.Payload()))
// ack message
if err := consumer.Ack(msg); err != nil {
log.Fatal(err)
}
}
assert.Equal(t, 10, len(payloadList))
payloadList = make([]string, 0, 5)
// consumer1 receive messages again
go func() {
for i := 0; i < 10; i++ {
msg, err := consumer1.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
expectMsg := fmt.Sprintf("hello-%d", i)
fmt.Printf("redeliver messages, payload is:%s\n", expectMsg)
payloadList = append(payloadList, string(msg.Payload()))
// ack message
if err := consumer1.Ack(msg); err != nil {
log.Fatal(err)
}
}
assert.Equal(t, 5, len(payloadList))
}()
// sleep 2 seconds, wait gorutine receive messages.
time.Sleep(time.Second * 2)
}
func TestConsumer_ReceiveAsyncWithCallback(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topicName := "persistent://public/default/receive-async-with-callback"
subName := "subscription-receive-async"
ctx := context.Background()
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.Nil(t, err)
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: subName,
})
assert.Nil(t, err)
defer consumer.Close()
//send 10 messages
for i := 0; i < 10; i++ {
err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.Nil(t, err)
}
for i := 0; i < 10; i++ {
tmpMsg := fmt.Sprintf("hello-%d", i)
consumer.ReceiveAsyncWithCallback(ctx, func(msg Message, err error) {
if err != nil {
log.Fatal(err)
}
fmt.Printf("receive message payload is:%s\n", string(msg.Payload()))
assert.Equal(t, tmpMsg, string(msg.Payload()))
})
}
}
func TestConsumer_Shared(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := "persistent://public/default/testMultiPartitionConsumerShared"
testURL := adminURL + "/" + "admin/v2/persistent/public/default/testMultiPartitionConsumerShared/partitions"
makeHTTPCall(t, http.MethodPut, testURL, "3")
sub := "sub-shared-1"
consumer1, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: sub,
Type: Shared,
})
assert.Nil(t, err)
defer consumer1.Close()
consumer2, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: sub,
Type: Shared,
})
assert.Nil(t, err)
defer consumer2.Close()
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.Nil(t, err)
defer producer.Close()
// send 10 messages
for i := 0; i < 10; i++ {
if err := producer.Send(context.Background(), &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
log.Fatal(err)
}
}
msgList := make([]string, 0, 5)
for i := 0; i < 5; i++ {
msg, err := consumer1.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("consumer1 msg id is: %v, value is: %s\n", msg.ID(), string(msg.Payload()))
msgList = append(msgList, string(msg.Payload()))
if err := consumer1.Ack(msg); err != nil {
log.Fatal(err)
}
}
assert.Equal(t, 5, len(msgList))
for i := 0; i < 5; i++ {
msg, err := consumer2.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
if err := consumer2.Ack(msg); err != nil {
log.Fatal(err)
}
fmt.Printf("consumer2 msg id is: %v, value is: %s\n", msg.ID(), string(msg.Payload()))
msgList = append(msgList, string(msg.Payload()))
}
assert.Equal(t, 10, len(msgList))
res := util.RemoveDuplicateElement(msgList)
assert.Equal(t, 10, len(res))
}