blob: 3fa2b89d3b8bb63e213306e8ed1561786f6937d6 [file] [log] [blame]
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package pulsar
import (
"context"
"fmt"
"strings"
"testing"
"time"
log "github.com/apache/pulsar/pulsar-client-go/logutil"
"github.com/stretchr/testify/assert"
)
func TestInvalidURL(t *testing.T) {
client, err := NewClient(ClientOptions{})
if client != nil || err == nil {
t.Fatal("Should have failed to create client")
}
}
func TestProducerConnectError(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://invalid-hostname:6650",
})
assert.Nil(t, err)
defer client.Close()
producer, err := client.CreateProducer(ProducerOptions{
Topic: "my-topic",
})
// Expect error in creating producer
assert.Nil(t, producer)
assert.NotNil(t, err)
assert.Equal(t, err.(*Error).Result(), ConnectError)
}
func TestProducer(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
StatsIntervalInSeconds: 10,
IOThreads: 1,
OperationTimeoutSeconds: 30,
ConcurrentLookupRequests: 1000,
MessageListenerThreads: 5,
})
assert.Nil(t, err)
defer client.Close()
producer, err := client.CreateProducer(ProducerOptions{
Topic: "my-topic",
Name: "my-producer-name",
SendTimeout: 10 * time.Second,
Batching: true,
BatchingMaxMessages: 100,
BatchingMaxPublishDelay: 10 * time.Millisecond,
MaxPendingMessages: 100,
BlockIfQueueFull: true,
CompressionType: LZ4,
Properties: map[string]string{
"my-name": "test",
"key": "value",
},
})
assert.Nil(t, err)
defer producer.Close()
assert.Equal(t, producer.Topic(), "persistent://public/default/my-topic")
assert.Equal(t, producer.Name(), "my-producer-name")
assert.Equal(t, producer.LastSequenceID(), int64(-1))
ctx := context.Background()
for i := 0; i < 10; i++ {
if err := producer.Send(ctx, ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
t.Fatal(err)
}
assert.Equal(t, producer.LastSequenceID(), int64(i))
}
assert.Equal(t, producer.LastSequenceID(), int64(9))
}
func TestProducerNoTopic(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
t.Fatal(err)
return
}
defer client.Close()
producer, err := client.CreateProducer(ProducerOptions{})
// Expect error in creating producer
assert.Nil(t, producer)
assert.NotNil(t, err)
assert.Equal(t, err.(*Error).Result(), InvalidConfiguration)
}
func TestMessageRouter(t *testing.T) {
// Create topic with 5 partitions
httpPut("http://localhost:8080/admin/v2/persistent/public/default/my-partitioned-topic/partitions",
5)
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
assert.Nil(t, err)
defer client.Close()
// Only subscribe on the specific partition
consumer, err := client.Subscribe(ConsumerOptions{
Topic: "my-partitioned-topic-partition-2",
SubscriptionName: "my-sub",
})
assert.Nil(t, err)
defer consumer.Close()
producer, err := client.CreateProducer(ProducerOptions{
Topic: "my-partitioned-topic",
MessageRouter: func(msg Message, tm TopicMetadata) int {
fmt.Println("Routing message ", msg, " -- Partitions: ", tm.NumPartitions())
return 2
},
})
assert.Nil(t, err)
defer producer.Close()
ctx := context.Background()
err = producer.Send(ctx, ProducerMessage{
Payload: []byte("hello"),
SequenceID: 1234,
})
assert.Nil(t, err)
assert.Equal(t, producer.LastSequenceID(), int64(1234))
fmt.Println("PUBLISHED")
// Verify message was published on partition 2
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.NotNil(t, msg)
assert.Equal(t, string(msg.Payload()), "hello")
}
func TestProducerZstd(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
assert.Nil(t, err)
defer client.Close()
producer, err := client.CreateProducer(ProducerOptions{
Topic: "my-topic",
CompressionType: ZSTD,
})
assert.Nil(t, err)
defer producer.Close()
assert.Equal(t, producer.Topic(), "persistent://public/default/my-topic")
ctx := context.Background()
for i := 0; i < 10; i++ {
if err := producer.Send(ctx, ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
t.Fatal(err)
}
}
}
func TestProducerSnappy(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
assert.Nil(t, err)
defer client.Close()
producer, err := client.CreateProducer(ProducerOptions{
Topic: "my-topic",
CompressionType: SNAPPY,
})
assert.Nil(t, err)
defer producer.Close()
assert.Equal(t, producer.Topic(), "persistent://public/default/my-topic")
ctx := context.Background()
for i := 0; i < 10; i++ {
if err := producer.Send(ctx, ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
t.Fatal(err)
}
}
}
func TestProducer_Flush(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
assert.Nil(t, err)
defer client.Close()
topicName := "test-flush-in-producer"
subName := "subscription-name"
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
Properties: map[string]string{
"producer-name": "test-producer-name",
"producer-id": "test-producer-id",
},
})
assert.Nil(t, err)
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: subName,
Properties: map[string]string{
"consumer-name": "test-consumer-name",
"consumer-id": "test-consumer-id",
},
})
assert.Nil(t, err)
defer consumer.Close()
ctx := context.Background()
for i := 0; i < 10; i++ {
// Create a different message to send asynchronously
asyncMsg := ProducerMessage{
Payload: []byte(fmt.Sprintf("async-message-%d", i)),
}
// Attempt to send the message asynchronously and handle the response
producer.SendAsync(ctx, asyncMsg, func(msg ProducerMessage, err error) {
if err != nil {
log.Fatal(err)
}
fmt.Printf("Message %s successfully published", msg.Payload)
})
producer.Flush()
}
}
func TestProducer_MessageID(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
assert.Nil(t, err)
defer client.Close()
topicName := "test-message-id"
subName := "sub-1"
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
Batching: true,
BatchingMaxMessages: 5,
})
assert.Nil(t, err)
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: subName,
})
assert.Nil(t, err)
defer consumer.Close()
ctx := context.Background()
for i := 0; i < 10; i++ {
// Create a different message to send asynchronously
asyncMsg := ProducerMessage{
Payload: []byte(fmt.Sprintf("async-message-%d", i)),
}
// Attempt to send the message asynchronously and handle the response
producer.SendAsync(ctx, asyncMsg, func(msg ProducerMessage, err error) {
if err != nil {
log.Fatal(err)
}
})
}
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(ctx)
if err != nil {
log.Fatal(err)
}
err = consumer.Ack(msg)
assert.Nil(t, err)
// msgID output: (11,16,-1,0)
msgID := fmt.Sprintf("%v", msg.ID())
index := strings.Index(msgID, "-1")
assert.Equal(t, 6, index)
}
}
func TestProducer_Batch(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
assert.Nil(t, err)
defer client.Close()
topicName := "test-batch-in-producer-111"
subName := "subscription-name111"
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
Batching: true,
BatchingMaxMessages: 5,
})
assert.Nil(t, err)
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: subName,
})
assert.Nil(t, err)
defer consumer.Close()
ctx := context.Background()
for i := 0; i < 10; i++ {
// Create a different message to send asynchronously
asyncMsg := ProducerMessage{
Payload: []byte(fmt.Sprintf("async-message-%d", i)),
}
// Attempt to send the message asynchronously and handle the response
producer.SendAsync(ctx, asyncMsg, func(msg ProducerMessage, err error) {
if err != nil {
log.Fatal(err)
}
})
}
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(ctx)
if err != nil {
log.Fatal(err)
}
err = consumer.Ack(msg)
assert.Nil(t, err)
msgID := fmt.Sprintf("message ID:%v", msg.ID())
num := strings.Count(msgID, "-1")
assert.Equal(t, 1, num)
}
}
func TestProducer_SendAndGetMsgID(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
assert.Nil(t, err)
defer client.Close()
topicName := "test-send-with-message-id"
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.Nil(t, err)
defer producer.Close()
for i := 0; i < 10; i++ {
msgID, err := producer.SendAndGetMsgID(context.Background(), ProducerMessage{
Payload: []byte(fmt.Sprintf("async-message-%d", i)),
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("enable batch, the message id: %v\n", msgID)
assert.NotNil(t, IsNil(msgID))
}
}
func TestProducer_DelayMessage(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
assert.Nil(t, err)
defer client.Close()
topicName := "test-send-with-message-id"
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.Nil(t, err)
defer producer.Close()
failoverConsumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-delay-message-failover",
})
assert.Nil(t, err)
defer failoverConsumer.Close()
sharedConsumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-delay-message-shared",
Type: Shared,
})
assert.Nil(t, err)
defer sharedConsumer.Close()
ctx := context.Background()
delay := time.Second * 5
begin := time.Now()
t.Logf("begin %v\n", begin)
for i := 0; i < 10; i++ {
err := producer.Send(ctx, ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
DeliverAfter: delay,
})
t.Logf("send message %d\n", i)
assert.Nil(t, err)
}
// Failover consumer will receive the messages immediately while
// the shared consumer will get them after the delay
for i := 0; i < 10; i++ {
msg, err := failoverConsumer.Receive(ctx)
assert.Nil(t, err)
t.Logf("message: %s\n", msg.Payload())
err = failoverConsumer.Ack(msg)
assert.Nil(t, err)
t.Logf("after %v\n", time.Now())
assert.True(t, time.Since(begin) < delay)
}
for i := 0; i < 10; i++ {
msg, err := sharedConsumer.Receive(ctx)
assert.Nil(t, err)
t.Logf("message: %s\n", msg.Payload())
err = sharedConsumer.Ack(msg)
assert.Nil(t, err)
t.Logf("after %v\n", time.Now())
assert.True(t, time.Since(begin) > delay)
}
}