blob: e83d341940b2ccb229f1db7be0a06c935fface0c [file] [log] [blame]
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package pulsar
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestConsumerConnectError(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://invalid-hostname:6650",
})
assert.Nil(t, err)
defer client.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-subscription",
})
// Expect error in creating consumer
assert.Nil(t, consumer)
assert.NotNil(t, err)
assert.Equal(t, err.(*Error).Result(), ConnectError)
}
func TestConsumer(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
assert.Nil(t, err)
defer client.Close()
topic := fmt.Sprintf("my-topic-%d", time.Now().Unix())
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})
assert.Nil(t, err)
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
AckTimeout: 1 * time.Minute,
Name: "my-consumer-name",
ReceiverQueueSize: 100,
MaxTotalReceiverQueueSizeAcrossPartitions: 10000,
Type: Shared,
})
assert.Nil(t, err)
defer consumer.Close()
assert.Equal(t, consumer.Topic(), "persistent://public/default/"+topic)
assert.Equal(t, consumer.Subscription(), "my-sub")
ctx := context.Background()
for i := 0; i < 10; i++ {
sendTime := time.Now()
if err := producer.Send(ctx, ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
t.Fatal(err)
}
msg, err := consumer.Receive(ctx)
recvTime := time.Now()
assert.Nil(t, err)
assert.NotNil(t, msg)
assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
assert.Equal(t, msg.Topic(), "persistent://public/default/"+topic)
fmt.Println("Send time: ", sendTime)
fmt.Println("Publish time: ", msg.PublishTime())
fmt.Println("Receive time: ", recvTime)
assert.True(t, sendTime.Unix() <= msg.PublishTime().Unix())
assert.True(t, recvTime.Unix() >= msg.PublishTime().Unix())
serializedId := msg.ID().Serialize()
deserializedId := DeserializeMessageID(serializedId)
assert.True(t, len(serializedId) > 0)
assert.True(t, bytes.Equal(deserializedId.Serialize(), serializedId))
consumer.Ack(msg)
}
err = consumer.Seek(EarliestMessage)
assert.Nil(t, err)
consumer.Unsubscribe()
}
func TestConsumerCompaction(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
assert.Nil(t, err)
defer client.Close()
topic := fmt.Sprintf("my-compaction-topic-%d", time.Now().Unix())
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})
assert.Nil(t, err)
defer producer.Close()
// Pre-create both subscriptions to retain published messages
consumer1, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub-1",
})
assert.Nil(t, err)
consumer1.Close()
consumer2, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub-2",
ReadCompacted: true,
})
assert.Nil(t, err)
consumer2.Close()
ctx := context.Background()
for i := 0; i < 10; i++ {
if err := producer.Send(ctx, ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Key: "Same-Key",
}); err != nil {
t.Fatal(err)
}
}
// Compact topic and wait for operation to complete
url := fmt.Sprintf("http://localhost:8080/admin/v2/persistent/public/default/%s/compaction", topic)
makeHttpPutCall(t, url)
for {
res := makeHttpGetCall(t, url)
if strings.Contains(res, "RUNNING") {
fmt.Println("Compaction still running")
time.Sleep(100 * time.Millisecond)
continue
} else {
assert.Equal(t, strings.Contains(res, "SUCCESS"), true)
fmt.Println("Compaction is done")
break
}
}
// Restart the consumers
consumer1, err = client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub-1",
})
assert.Nil(t, err)
defer consumer1.Close()
consumer2, err = client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub-2",
ReadCompacted: true,
})
assert.Nil(t, err)
defer consumer2.Close()
// Consumer-1 will receive all messages
for i := 0; i < 10; i++ {
msg, err := consumer1.Receive(context.Background())
assert.Nil(t, err)
assert.NotNil(t, msg)
assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
}
// Consumer-2 will only receive the last message
msg, err := consumer2.Receive(context.Background())
assert.Nil(t, err)
assert.NotNil(t, msg)
assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-9"))
// No more messages on consumer-2
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
msg, err = consumer2.Receive(ctx)
assert.Nil(t, msg)
assert.NotNil(t, err)
}
func TestConsumerWithInvalidConf(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
t.Fatal(err)
return
}
defer client.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: "my-topic",
})
// Expect error in creating cosnumer
assert.Nil(t, consumer)
assert.NotNil(t, err)
assert.Equal(t, err.(*Error).Result(), InvalidConfiguration)
consumer, err = client.Subscribe(ConsumerOptions{
SubscriptionName: "my-subscription",
})
// Expect error in creating consumer
assert.Nil(t, consumer)
assert.NotNil(t, err)
assert.Equal(t, err.(*Error).Result(), InvalidConfiguration)
}
func makeHttpPutCall(t *testing.T, url string) string {
return makeHttpCall(t, http.MethodPut, url)
}
func makeHttpGetCall(t *testing.T, url string) string {
return makeHttpCall(t, http.MethodGet, url)
}
func makeHttpCall(t *testing.T, method string, url string) string {
client := http.Client{}
req, err := http.NewRequest(method, url, nil)
if err != nil {
t.Fatal(err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
res, err := client.Do(req)
if err != nil {
t.Fatal(err)
}
body, err := ioutil.ReadAll(res.Body)
if err != nil {
t.Fatal(err)
}
return string(body)
}
func TestConsumerMultiTopics(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
assert.Nil(t, err)
defer client.Close()
producer1, err := client.CreateProducer(ProducerOptions{
Topic: "multi-topic-1",
})
assert.Nil(t, err)
producer2, err := client.CreateProducer(ProducerOptions{
Topic: "multi-topic-2",
})
assert.Nil(t, err)
defer producer1.Close()
defer producer2.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topics: []string{"multi-topic-1", "multi-topic-2"},
SubscriptionName: "my-sub",
})
assert.Nil(t, err)
defer consumer.Close()
assert.Equal(t, consumer.Subscription(), "my-sub")
ctx := context.Background()
for i := 0; i < 10; i++ {
if err := producer1.Send(ctx, ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
SequenceID: 3,
}); err != nil {
t.Fatal(err)
}
assert.Equal(t, producer1.LastSequenceID(), int64(3))
if err := producer2.Send(ctx, ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
SequenceID: 0,
}); err != nil {
t.Fatal(err)
}
assert.Equal(t, producer2.LastSequenceID(), int64(i))
}
for i := 0; i < 20; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.NotNil(t, msg)
consumer.Ack(msg)
}
consumer.Unsubscribe()
}
func TestConsumerRegex(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
assert.Nil(t, err)
defer client.Close()
producer1, err := client.CreateProducer(ProducerOptions{
Topic: "topic-1",
})
assert.Nil(t, err)
producer2, err := client.CreateProducer(ProducerOptions{
Topic: "topic-2",
})
assert.Nil(t, err)
defer producer1.Close()
defer producer2.Close()
consumer, err := client.Subscribe(ConsumerOptions{
TopicsPattern: "persistent://public/default/topic-.*",
SubscriptionName: "my-sub",
})
assert.Nil(t, err)
defer consumer.Close()
assert.Equal(t, consumer.Subscription(), "my-sub")
ctx := context.Background()
for i := 0; i < 10; i++ {
if err := producer1.Send(ctx, ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
t.Fatal(err)
}
if err := producer2.Send(ctx, ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
t.Fatal(err)
}
}
for i := 0; i < 20; i++ {
ctx, _ = context.WithTimeout(context.Background(), 1*time.Second)
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.NotNil(t, msg)
consumer.Ack(msg)
}
consumer.Unsubscribe()
}
func TestConsumer_Seek(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
assert.Nil(t, err)
defer client.Close()
topicName := "persistent://public/default/testSeek"
subName := "sub-testSeek"
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.Nil(t, err)
assert.Equal(t, producer.Topic(), topicName)
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: subName,
})
assert.Nil(t, err)
assert.Equal(t, consumer.Topic(), topicName)
assert.Equal(t, consumer.Subscription(), subName)
defer consumer.Close()
ctx := context.Background()
// Send 10 messages synchronously
t.Log("Publishing 10 messages synchronously")
for msgNum := 0; msgNum < 10; msgNum++ {
if err := producer.Send(ctx, ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", msgNum)),
}); err != nil {
t.Fatal(err)
}
}
t.Log("Trying to receive 10 messages")
for msgNum := 0; msgNum < 10; msgNum++ {
_, err := consumer.Receive(ctx)
assert.Nil(t, err)
}
// seek to earliest, expected receive first message.
err = consumer.Seek(EarliestMessage)
assert.Nil(t, err)
// Sleeping for 500ms to wait for consumer re-connect
time.Sleep(500 * time.Millisecond)
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
t.Logf("again received message:%+v", msg.ID())
assert.Equal(t, "msg-content-0", string(msg.Payload()))
consumer.Unsubscribe()
}
func TestConsumer_SubscriptionInitPos(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
assert.Nil(t, err)
defer client.Close()
topicName := fmt.Sprintf("testSeek-%d", time.Now().Unix())
subName := "test-subscription-initial-earliest-position"
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.Nil(t, err)
defer producer.Close()
//sent message
ctx := context.Background()
err = producer.Send(ctx, ProducerMessage{
Payload: []byte("msg-1-content-1"),
})
assert.Nil(t, err)
err = producer.Send(ctx, ProducerMessage{
Payload: []byte("msg-1-content-2"),
})
assert.Nil(t, err)
// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: subName,
SubscriptionInitPos: Earliest,
})
assert.Nil(t, err)
defer consumer.Close()
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, "msg-1-content-1", string(msg.Payload()))
}
func TestConsumerNegativeAcks(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
assert.Nil(t, err)
defer client.Close()
topic := "TestConsumerNegativeAcks"
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})
assert.Nil(t, err)
defer producer.Close()
nackDelay := 100 * time.Millisecond
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
NackRedeliveryDelay: &nackDelay,
})
assert.Nil(t, err)
defer consumer.Close()
ctx := context.Background()
for i := 0; i < 10; i++ {
producer.SendAsync(ctx, ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}, func(producerMessage ProducerMessage, e error) {
fmt.Print("send complete. err=", e)
})
}
producer.Flush()
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.NotNil(t, msg)
assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
// Ack with error
consumer.Nack(msg)
}
// Messages will be redelivered
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.NotNil(t, msg)
assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
// This time acks successfully
consumer.Ack(msg)
}
consumer.Unsubscribe()
}
func TestConsumerShared(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
assert.Nil(t, err)
defer client.Close()
topic := "persistent://public/default/test-topic-6"
consumer1, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "sub-1",
Type: KeyShared,
})
assert.Nil(t, err)
defer consumer1.Close()
consumer2, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "sub-1",
Type: KeyShared,
})
assert.Nil(t, err)
defer consumer2.Close()
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
Batching: false,
})
assert.Nil(t, err)
defer producer.Close()
ctx := context.Background()
for i := 0; i < 10; i++ {
err := producer.Send(ctx, ProducerMessage{
Key: fmt.Sprintf("key-shared-%d", i%4),
Payload: []byte(fmt.Sprintf("value-%d", i)),
})
assert.Nil(t, err)
}
time.Sleep(time.Second * 5)
go func() {
for i := 0; i < 10; i++ {
msg, err := consumer1.Receive(ctx)
assert.Nil(t, err)
if msg != nil {
fmt.Printf("consumer1 key is: %s, value is: %s\n", msg.Key(), string(msg.Payload()))
err = consumer1.Ack(msg)
assert.Nil(t, err)
}
}
}()
go func() {
for i := 0; i < 10; i++ {
msg2, err := consumer2.Receive(ctx)
assert.Nil(t, err)
if msg2 != nil {
fmt.Printf("consumer2 key is:%s, value is: %s\n", msg2.Key(), string(msg2.Payload()))
err = consumer2.Ack(msg2)
assert.Nil(t, err)
}
}
}()
}
func TestConsumer_AckTimeout(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
assert.Nil(t, err)
defer client.Close()
topic := fmt.Sprintf("my-topic-%d", time.Now().Unix())
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})
assert.Nil(t, err)
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
AckTimeout: 10 * time.Second,
Name: "my-consumer-name",
Type: Shared,
})
assert.Nil(t, err)
defer consumer.Close()
assert.Equal(t, consumer.Topic(), "persistent://public/default/"+topic)
assert.Equal(t, consumer.Subscription(), "my-sub")
ctx := context.Background()
// send one message
if err := producer.Send(ctx, ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-pulsar")),
}); err != nil {
t.Fatal(err)
}
// receive message but not ack
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-pulsar"))
// wait ack timeout
time.Sleep(10 * time.Second)
// receive message again
msgAgain, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, string(msgAgain.Payload()), fmt.Sprintf("hello-pulsar"))
if err := consumer.Ack(msgAgain); err != nil {
assert.Nil(t, err)
}
if err := consumer.Unsubscribe(); err != nil {
assert.Nil(t, err)
}
}