blob: 6c0c3f21c4fae277e12d85b80754bedab5969f61 [file] [log] [blame]
#!/usr/bin/env python3
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import random
import threading
import logging
from typing import Optional
from unittest import TestCase, main
import time
import os
import re
import pulsar
import uuid
from datetime import timedelta
from pulsar import (
Client,
MessageId,
CompressionType,
ConsumerType,
KeySharedMode,
ConsumerKeySharedPolicy,
PartitionsRoutingMode,
AuthenticationBasic,
AuthenticationTLS,
Authentication,
AuthenticationToken,
InitialPosition,
CryptoKeyReader,
ConsumerBatchReceivePolicy,
ProducerAccessMode,
ConsumerDeadLetterPolicy,
)
from pulsar.schema import JsonSchema, Record, Integer
from _pulsar import ProducerConfiguration, ConsumerConfiguration, RegexSubscriptionMode
from urllib.request import urlopen, Request
TM = 10000 # Do not wait forever in tests
TESTS_DIR = os.path.dirname(os.path.abspath(__file__))
TOKEN_PATH = TESTS_DIR + "/.test-token.txt"
CERTS_DIR = TESTS_DIR + "/test-conf/"
def doHttpPost(url, data):
req = Request(url, data.encode())
req.add_header("Content-Type", "application/json")
urlopen(req)
def doHttpPut(url, data):
try:
req = Request(url, data.encode())
req.add_header("Content-Type", "application/json")
req.get_method = lambda: "PUT"
urlopen(req)
except Exception as ex:
# ignore conflicts exception to have test idempotency
if "409" in str(ex):
pass
else:
raise ex
def doHttpGet(url):
req = Request(url)
req.add_header("Accept", "application/json")
return urlopen(req).read()
class TestRecord(Record):
a = Integer()
b = Integer()
class PulsarTest(TestCase):
serviceUrl = "pulsar://localhost:6650"
adminUrl = "http://localhost:8080"
serviceUrlTls = "pulsar+ssl://localhost:6651"
def test_producer_config(self):
conf = ProducerConfiguration()
conf.send_timeout_millis(12)
self.assertEqual(conf.send_timeout_millis(), 12)
self.assertEqual(conf.compression_type(), CompressionType.NONE)
conf.compression_type(CompressionType.LZ4)
self.assertEqual(conf.compression_type(), CompressionType.LZ4)
conf.max_pending_messages(120)
self.assertEqual(conf.max_pending_messages(), 120)
def test_consumer_config(self):
conf = ConsumerConfiguration()
self.assertEqual(conf.consumer_type(), ConsumerType.Exclusive)
conf.consumer_type(ConsumerType.Shared)
self.assertEqual(conf.consumer_type(), ConsumerType.Shared)
self.assertEqual(conf.consumer_name(), "")
conf.consumer_name("my-name")
self.assertEqual(conf.consumer_name(), "my-name")
self.assertEqual(conf.replicate_subscription_state_enabled(), False)
conf.replicate_subscription_state_enabled(True)
self.assertEqual(conf.replicate_subscription_state_enabled(), True)
def test_connect_error(self):
with self.assertRaises(ValueError):
Client("fakeServiceUrl")
def test_exception_inheritance(self):
assert issubclass(pulsar.ConnectError, pulsar.PulsarException)
assert issubclass(pulsar.PulsarException, Exception)
def test_simple_producer(self):
client = Client(self.serviceUrl)
producer = client.create_producer("my-python-topic")
producer.send(b"hello")
producer.close()
client.close()
def test_producer_send_async(self):
client = Client(self.serviceUrl)
producer = client.create_producer("my-python-topic")
sent_messages = []
def send_callback(producer, msg):
sent_messages.append(msg)
producer.send_async(b"hello", send_callback)
producer.send_async(b"hello", send_callback)
producer.send_async(b"hello", send_callback)
i = 0
while len(sent_messages) < 3 and i < 100:
time.sleep(0.1)
i += 1
self.assertEqual(len(sent_messages), 3)
client.close()
def test_producer_send(self):
client = Client(self.serviceUrl)
topic = "test_producer_send"
producer = client.create_producer(topic)
consumer = client.subscribe(topic, "sub-name")
msg_id = producer.send(b"hello")
print("send to {}".format(msg_id))
msg = consumer.receive(TM)
consumer.acknowledge(msg)
print("receive from {}".format(msg.message_id()))
self.assertEqual(msg_id, msg.message_id())
client.close()
def test_producer_access_mode_exclusive(self):
client = Client(self.serviceUrl)
topic_name = "test-access-mode-exclusive"
client.create_producer(topic_name, producer_name="p1", access_mode=ProducerAccessMode.Exclusive)
with self.assertRaises(pulsar.ProducerFenced):
client.create_producer(topic_name, producer_name="p2", access_mode=ProducerAccessMode.Exclusive)
client.close()
def test_producer_access_mode_wait_exclusive(self):
client = Client(self.serviceUrl)
topic_name = "test_producer_access_mode_wait_exclusive"
producer1 = client.create_producer(
topic=topic_name,
producer_name='p-1',
access_mode=ProducerAccessMode.Exclusive
)
assert producer1.producer_name() == 'p-1'
# when p1 close, p2 success created.
producer1.close()
producer2 = client.create_producer(
topic=topic_name,
producer_name='p-2',
access_mode=ProducerAccessMode.WaitForExclusive
)
assert producer2.producer_name() == 'p-2'
producer2.close()
client.close()
def test_producer_access_mode_exclusive_with_fencing(self):
client = Client(self.serviceUrl)
topic_name = 'test_producer_access_mode_exclusive_with_fencing'
producer1 = client.create_producer(
topic=topic_name,
producer_name='p-1',
access_mode=ProducerAccessMode.Exclusive
)
assert producer1.producer_name() == 'p-1'
producer2 = client.create_producer(
topic=topic_name,
producer_name='p-2',
access_mode=ProducerAccessMode.ExclusiveWithFencing
)
assert producer2.producer_name() == 'p-2'
# producer1 will be fenced.
with self.assertRaises((pulsar.ProducerFenced, pulsar.AlreadyClosed)):
producer1.send('test-msg'.encode('utf-8'))
# sleep 200ms to make sure producer1 is close done.
time.sleep(0.2)
producer2.close()
client.close()
def test_producer_is_connected(self):
client = Client(self.serviceUrl)
topic = "test_producer_is_connected"
producer = client.create_producer(topic)
self.assertTrue(producer.is_connected())
producer.close()
self.assertFalse(producer.is_connected())
client.close()
def test_producer_consumer(self):
client = Client(self.serviceUrl)
consumer = client.subscribe("my-python-topic-producer-consumer", "my-sub", consumer_type=ConsumerType.Shared)
producer = client.create_producer("my-python-topic-producer-consumer")
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
consumer.unsubscribe()
client.close()
def test_ordering_key(self):
client = Client(self.serviceUrl)
consumer = client.subscribe(
"my-python-topic-ordering-key", "my-sub", consumer_type=ConsumerType.KeyShared
)
producer = client.create_producer("my-python-topic-ordering-key")
producer.send(b"ordered-hello", ordering_key="random-ordering-key")
# Message should be available immediately with ordering key set
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"ordered-hello")
self.assertEqual(msg.ordering_key(), "random-ordering-key")
consumer.unsubscribe()
producer.close()
client.close()
def test_redelivery_count(self):
client = Client(self.serviceUrl)
consumer = client.subscribe(
"my-python-topic-redelivery-count",
"my-sub",
consumer_type=ConsumerType.Shared,
negative_ack_redelivery_delay_ms=500,
)
producer = client.create_producer("my-python-topic-redelivery-count")
producer.send(b"hello")
redelivery_count = 0
for i in range(4):
msg = consumer.receive(TM)
print("Received message %s" % msg.data())
if i % 2 == 0:
consumer.negative_acknowledge(msg)
else:
consumer.negative_acknowledge(msg.message_id())
redelivery_count = msg.redelivery_count()
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
self.assertEqual(3, redelivery_count)
consumer.unsubscribe()
producer.close()
client.close()
def test_deliver_at(self):
client = Client(self.serviceUrl)
consumer = client.subscribe("my-python-topic-deliver-at", "my-sub", consumer_type=ConsumerType.Shared)
producer = client.create_producer("my-python-topic-deliver-at")
# Delay message in 1.1s
producer.send(b"hello", deliver_at=int(round(time.time() * 1000)) + 1100)
# Message should not be available in the next second
with self.assertRaises(pulsar.Timeout):
consumer.receive(1000)
# Message should be published now
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
consumer.unsubscribe()
producer.close()
client.close()
def test_deliver_after(self):
client = Client(self.serviceUrl)
consumer = client.subscribe("my-python-topic-deliver-after", "my-sub", consumer_type=ConsumerType.Shared)
producer = client.create_producer("my-python-topic-deliver-after")
# Delay message in 1.1s
producer.send(b"hello", deliver_after=timedelta(milliseconds=1100))
# Message should not be available in the next second
with self.assertRaises(pulsar.Timeout):
consumer.receive(1000)
# Message should be published in the next 500ms
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
consumer.unsubscribe()
producer.close()
client.close()
def test_consumer_initial_position(self):
client = Client(self.serviceUrl)
producer = client.create_producer("consumer-initial-position")
# Sending 5 messages before consumer creation.
# These should be received with initial_position set to Earliest but not with Latest.
for i in range(5):
producer.send(b"hello-%d" % i)
consumer = client.subscribe(
"consumer-initial-position",
"my-sub",
consumer_type=ConsumerType.Shared,
initial_position=InitialPosition.Earliest,
)
# Sending 5 other messages that should be received regardless of the initial_position.
for i in range(5, 10):
producer.send(b"hello-%d" % i)
for i in range(10):
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello-%d" % i)
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
consumer.unsubscribe()
client.close()
def test_consumer_queue_size_is_zero(self):
client = Client(self.serviceUrl)
consumer = client.subscribe(
"my-python-topic-consumer-init-queue-size-is-zero",
"my-sub",
consumer_type=ConsumerType.Shared,
receiver_queue_size=0,
initial_position=InitialPosition.Earliest,
)
producer = client.create_producer("my-python-topic-consumer-init-queue-size-is-zero")
producer.send(b"hello")
time.sleep(0.1)
msg = consumer.receive()
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
consumer.unsubscribe()
client.close()
def test_message_properties(self):
client = Client(self.serviceUrl)
topic = "my-python-test-message-properties"
consumer = client.subscribe(
topic=topic, subscription_name="my-subscription", schema=pulsar.schema.StringSchema()
)
producer = client.create_producer(topic=topic, schema=pulsar.schema.StringSchema())
producer.send("hello", properties={"a": "1", "b": "2"})
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.value(), "hello")
self.assertEqual(msg.properties(), {"a": "1", "b": "2"})
consumer.unsubscribe()
client.close()
def test_tls_auth(self):
client = Client(
self.serviceUrlTls,
tls_trust_certs_file_path=CERTS_DIR + "cacert.pem",
tls_allow_insecure_connection=False,
authentication=AuthenticationTLS(CERTS_DIR + "client-cert.pem", CERTS_DIR + "client-key.pem"),
)
topic = "my-python-topic-tls-auth-" + str(time.time())
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
producer = client.create_producer(topic)
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
client.close()
def test_tls_auth2(self):
authPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationTls"
authParams = "tlsCertFile:%s/client-cert.pem,tlsKeyFile:%s/client-key.pem" % (CERTS_DIR, CERTS_DIR)
client = Client(
self.serviceUrlTls,
tls_trust_certs_file_path=CERTS_DIR + "cacert.pem",
tls_allow_insecure_connection=False,
authentication=Authentication(authPlugin, authParams),
)
topic = "my-python-topic-tls-auth-2-" + str(time.time())
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
producer = client.create_producer(topic)
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
client.close()
def test_encryption(self):
publicKeyPath = CERTS_DIR + "public-key.client-rsa.pem"
privateKeyPath = CERTS_DIR + "private-key.client-rsa.pem"
crypto_key_reader = CryptoKeyReader(publicKeyPath, privateKeyPath)
client = Client(self.serviceUrl)
topic = "my-python-test-end-to-end-encryption"
consumer = client.subscribe(
topic=topic, subscription_name="my-subscription", crypto_key_reader=crypto_key_reader
)
producer = client.create_producer(
topic=topic, encryption_key="client-rsa.pem", crypto_key_reader=crypto_key_reader
)
reader = client.create_reader(
topic=topic, start_message_id=MessageId.earliest, crypto_key_reader=crypto_key_reader
)
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.value(), b"hello")
consumer.unsubscribe()
msg = reader.read_next(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
with self.assertRaises(pulsar.Timeout):
reader.read_next(100)
reader.close()
client.close()
def test_encryption_failure(self):
publicKeyPath = CERTS_DIR + "public-key.client-rsa.pem"
privateKeyPath = CERTS_DIR + "private-key.client-rsa.pem"
crypto_key_reader = CryptoKeyReader(publicKeyPath, privateKeyPath)
client = Client(self.serviceUrl)
topic = "my-python-test-end-to-end-encryption-failure-" + str(time.time())
producer = client.create_producer(
topic=topic, encryption_key="client-rsa.pem", crypto_key_reader=crypto_key_reader
)
producer.send(b"msg-0")
def verify_next_message(value: bytes):
consumer = client.subscribe(topic, subscription,
crypto_key_reader=crypto_key_reader)
msg = consumer.receive(3000)
self.assertEqual(msg.data(), value)
consumer.acknowledge(msg)
consumer.close()
subscription = "my-sub"
consumer = client.subscribe(topic, subscription,
initial_position=InitialPosition.Earliest,
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.FAIL)
with self.assertRaises(pulsar.Timeout):
consumer.receive(3000)
consumer.close()
producer.send(b"msg-1")
verify_next_message(b"msg-0") # msg-0 won't be skipped
consumer = client.subscribe(topic, subscription,
initial_position=InitialPosition.Earliest,
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.DISCARD)
with self.assertRaises(pulsar.Timeout):
consumer.receive(3000)
consumer.close()
producer.send(b"msg-2")
verify_next_message(b"msg-2") # msg-1 is skipped since the crypto failure action is DISCARD
# Encrypted messages will be consumed since the crypto failure action is CONSUME
consumer = client.subscribe(topic, 'another-sub',
initial_position=InitialPosition.Earliest,
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME)
for i in range(3):
msg = consumer.receive(3000)
self.assertNotEqual(msg.data(), f"msg-{i}".encode())
self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}")
reader = client.create_reader(topic, MessageId.earliest,
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME)
for i in range(3):
msg = reader.read_next(3000)
self.assertNotEqual(msg.data(), f"msg-{i}".encode())
self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}")
client.close()
def test_tls_auth3(self):
authPlugin = "tls"
authParams = "tlsCertFile:%s/client-cert.pem,tlsKeyFile:%s/client-key.pem" % (CERTS_DIR, CERTS_DIR)
client = Client(
self.serviceUrlTls,
tls_trust_certs_file_path=CERTS_DIR + "cacert.pem",
tls_allow_insecure_connection=False,
authentication=Authentication(authPlugin, authParams),
)
topic = "my-python-topic-tls-auth-3-" + str(time.time())
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
producer = client.create_producer(topic)
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
client.close()
def test_auth_junk_params(self):
authPlugin = "someoldjunk.so"
authParams = "blah"
client = Client(
self.serviceUrlTls,
tls_trust_certs_file_path=CERTS_DIR + "cacert.pem",
tls_allow_insecure_connection=False,
authentication=Authentication(authPlugin, authParams),
)
with self.assertRaises(pulsar.AuthenticationError):
client.subscribe("my-python-topic-auth-junk-params", "my-sub", consumer_type=ConsumerType.Shared)
def test_message_listener(self):
client = Client(self.serviceUrl)
received_messages = []
def listener(consumer, msg):
print("Got message: %s" % msg)
received_messages.append(msg)
consumer.acknowledge(msg)
client.subscribe(
"my-python-topic-listener", "my-sub", consumer_type=ConsumerType.Exclusive, message_listener=listener
)
producer = client.create_producer("my-python-topic-listener")
producer.send(b"hello-1")
producer.send(b"hello-2")
producer.send(b"hello-3")
time.sleep(0.1)
self.assertEqual(len(received_messages), 3)
self.assertEqual(received_messages[0].data(), b"hello-1")
self.assertEqual(received_messages[1].data(), b"hello-2")
self.assertEqual(received_messages[2].data(), b"hello-3")
client.close()
def test_consumer_is_connected(self):
client = Client(self.serviceUrl)
topic = "test_consumer_is_connected"
sub = "sub"
consumer = client.subscribe(topic, sub)
self.assertTrue(consumer.is_connected())
consumer.close()
self.assertFalse(consumer.is_connected())
client.close()
def test_reader_simple(self):
client = Client(self.serviceUrl)
reader = client.create_reader("my-python-topic-reader-simple", MessageId.earliest)
producer = client.create_producer("my-python-topic-reader-simple")
producer.send(b"hello")
msg = reader.read_next(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
with self.assertRaises(pulsar.Timeout):
reader.read_next(100)
reader.close()
client.close()
def test_reader_on_last_message(self):
client = Client(self.serviceUrl)
producer = client.create_producer("my-python-topic-reader-on-last-message")
for i in range(10):
producer.send(b"hello-%d" % i)
reader = client.create_reader("my-python-topic-reader-on-last-message", MessageId.latest)
for i in range(10, 20):
producer.send(b"hello-%d" % i)
for i in range(10, 20):
msg = reader.read_next(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello-%d" % i)
reader.close()
client.close()
def test_reader_on_specific_message(self):
num_of_msgs = 10
client = Client(self.serviceUrl)
producer = client.create_producer("my-python-topic-reader-on-specific-message")
for i in range(num_of_msgs):
producer.send(b"hello-%d" % i)
reader1 = client.create_reader("my-python-topic-reader-on-specific-message", MessageId.earliest)
for i in range(num_of_msgs // 2):
msg = reader1.read_next(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello-%d" % i)
last_msg_id = msg.message_id()
last_msg_idx = i
reader2 = client.create_reader("my-python-topic-reader-on-specific-message", last_msg_id)
# The reset would be effectively done on the next position relative to reset.
# When available, we should test this behaviour with `startMessageIdInclusive` opt.
from_msg_idx = last_msg_idx
for i in range(from_msg_idx + 1, num_of_msgs):
msg = reader2.read_next(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello-%d" % i)
reader1.close()
reader2.close()
client.close()
def test_reader_on_specific_message_with_batches(self):
client = Client(self.serviceUrl)
producer = client.create_producer(
"my-python-topic-reader-on-specific-message-with-batches",
batching_enabled=True,
batching_max_publish_delay_ms=1000,
)
for i in range(10):
producer.send_async(b"hello-%d" % i, None)
# Send one sync message to make sure everything was published
producer.send(b"hello-10")
reader1 = client.create_reader("my-python-topic-reader-on-specific-message-with-batches", MessageId.earliest)
for i in range(5):
msg = reader1.read_next(TM)
last_msg_id = msg.message_id()
reader2 = client.create_reader("my-python-topic-reader-on-specific-message-with-batches", last_msg_id)
for i in range(5, 11):
msg = reader2.read_next(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello-%d" % i)
reader1.close()
reader2.close()
client.close()
def test_reader_on_partitioned_topic(self):
num_of_msgs = 100
topic_name = "public/default/my-python-topic-test_reader_on_partitioned_topic"
url1 = self.adminUrl + "/admin/v2/persistent/" + topic_name + "/partitions"
doHttpPut(url1, "4")
client = Client(self.serviceUrl)
producer = client.create_producer(topic_name)
send_array = []
for i in range(num_of_msgs):
data = b"hello-%d" % i
producer.send(data)
send_array.append(data)
reader = client.create_reader(topic_name, MessageId.earliest)
read_array = []
for i in range(num_of_msgs):
msg = reader.read_next(TM)
self.assertTrue(msg)
read_array.append(msg.data())
self.assertListEqual(sorted(send_array), sorted(read_array))
reader.close()
client.close()
def test_reader_is_connected(self):
client = Client(self.serviceUrl)
topic = "test_reader_is_connected"
reader = client.create_reader(topic, MessageId.earliest)
self.assertTrue(reader.is_connected())
reader.close()
self.assertFalse(reader.is_connected())
client.close()
def test_reader_seek_for_message_id(self):
client = pulsar.Client(self.serviceUrl)
topic = "test-seek-for-message-id-" + str(int(time.time()))
producer = client.create_producer(topic)
readerExclusive = client.create_reader(topic, MessageId.latest)
readerInclusive = client.create_reader(topic, MessageId.latest, start_message_id_inclusive=True)
numMessages = 100
seekMessageId = None
r = random.randint(0, numMessages - 2)
for i in range(numMessages):
msg_content = b"msg-%d" % i
id = producer.send(msg_content)
if i == r:
seekMessageId = id
readerExclusive.seek(seekMessageId)
msg0 = readerExclusive.read_next(timeout_millis=3000)
readerInclusive.seek(seekMessageId)
msg1 = readerInclusive.read_next(timeout_millis=3000)
self.assertEqual(msg0.data(), b"msg-%d" % (r + 1))
self.assertEqual(msg1.data(), b"msg-%d" % r)
readerExclusive.close()
readerInclusive.close()
producer.close()
client.close()
def test_producer_sequence_after_reconnection(self):
# Enable deduplication on namespace
doHttpPost(self.adminUrl + "/admin/v2/namespaces/public/default/deduplication", "true")
client = Client(self.serviceUrl)
topic = "my-python-test-producer-sequence-after-reconnection-" + str(time.time())
producer = client.create_producer(topic, producer_name="my-producer-name")
self.assertEqual(producer.last_sequence_id(), -1)
for i in range(10):
producer.send(b"hello-%d" % i)
self.assertEqual(producer.last_sequence_id(), i)
producer.close()
producer = client.create_producer(topic, producer_name="my-producer-name")
self.assertEqual(producer.last_sequence_id(), 9)
for i in range(10, 20):
producer.send(b"hello-%d" % i)
self.assertEqual(producer.last_sequence_id(), i)
client.close()
doHttpPost(self.adminUrl + "/admin/v2/namespaces/public/default/deduplication", "false")
def test_producer_deduplication(self):
# Enable deduplication on namespace
doHttpPost(self.adminUrl + "/admin/v2/namespaces/public/default/deduplication", "true")
client = Client(self.serviceUrl)
topic = "my-python-test-producer-deduplication-" + str(time.time())
producer = client.create_producer(topic, producer_name="my-producer-name")
self.assertEqual(producer.last_sequence_id(), -1)
consumer = client.subscribe(topic, "my-sub")
producer.send(b"hello-0", sequence_id=0)
producer.send(b"hello-1", sequence_id=1)
producer.send(b"hello-2", sequence_id=2)
self.assertEqual(producer.last_sequence_id(), 2)
# Repeat the messages and verify they're not received by consumer
producer.send(b"hello-1", sequence_id=1)
producer.send(b"hello-2", sequence_id=2)
self.assertEqual(producer.last_sequence_id(), 2)
for i in range(3):
msg = consumer.receive(TM)
self.assertEqual(msg.data(), b"hello-%d" % i)
consumer.acknowledge(msg)
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
producer.close()
producer = client.create_producer(topic, producer_name="my-producer-name")
self.assertEqual(producer.last_sequence_id(), 2)
# Repeat the messages and verify they're not received by consumer
producer.send(b"hello-1", sequence_id=1)
producer.send(b"hello-2", sequence_id=2)
self.assertEqual(producer.last_sequence_id(), 2)
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
client.close()
doHttpPost(self.adminUrl + "/admin/v2/namespaces/public/default/deduplication", "false")
def test_producer_routing_mode(self):
client = Client(self.serviceUrl)
producer = client.create_producer(
"my-python-test-producer", message_routing_mode=PartitionsRoutingMode.UseSinglePartition
)
producer.send(b"test")
client.close()
def test_message_argument_errors(self):
client = Client(self.serviceUrl)
topic = "my-python-test-producer"
producer = client.create_producer(topic)
content = "test".encode("utf-8")
self._check_type_error(lambda: producer.send(5))
self._check_value_error(lambda: producer.send(content, properties="test"))
self._check_value_error(lambda: producer.send(content, partition_key=5))
self._check_value_error(lambda: producer.send(content, sequence_id="test"))
self._check_value_error(lambda: producer.send(content, replication_clusters=5))
self._check_value_error(lambda: producer.send(content, disable_replication="test"))
self._check_value_error(lambda: producer.send(content, event_timestamp="test"))
self._check_value_error(lambda: producer.send(content, deliver_at="test"))
self._check_value_error(lambda: producer.send(content, deliver_after="test"))
client.close()
def test_client_argument_errors(self):
self._check_value_error(lambda: Client(None))
self._check_value_error(lambda: Client(self.serviceUrl, authentication="test"))
self._check_value_error(lambda: Client(self.serviceUrl, operation_timeout_seconds="test"))
self._check_value_error(lambda: Client(self.serviceUrl, stats_interval_in_seconds="test"))
self._check_value_error(lambda: Client(self.serviceUrl, io_threads="test"))
self._check_value_error(lambda: Client(self.serviceUrl, message_listener_threads="test"))
self._check_value_error(lambda: Client(self.serviceUrl, concurrent_lookup_requests="test"))
self._check_value_error(lambda: Client(self.serviceUrl, use_tls="test"))
self._check_value_error(lambda: Client(self.serviceUrl, tls_trust_certs_file_path=5))
self._check_value_error(lambda: Client(self.serviceUrl, tls_allow_insecure_connection="test"))
def test_producer_argument_errors(self):
client = Client(self.serviceUrl)
self._check_value_error(lambda: client.create_producer(None))
topic = "my-python-test-producer"
self._check_value_error(lambda: client.create_producer(topic, producer_name=5))
self._check_value_error(lambda: client.create_producer(topic, initial_sequence_id="test"))
self._check_value_error(lambda: client.create_producer(topic, send_timeout_millis="test"))
self._check_value_error(lambda: client.create_producer(topic, compression_type=None))
self._check_value_error(lambda: client.create_producer(topic, max_pending_messages="test"))
self._check_value_error(lambda: client.create_producer(topic, block_if_queue_full="test"))
self._check_value_error(lambda: client.create_producer(topic, batching_enabled="test"))
self._check_value_error(lambda: client.create_producer(topic, batching_enabled="test"))
self._check_value_error(lambda: client.create_producer(topic, batching_max_allowed_size_in_bytes="test"))
self._check_value_error(lambda: client.create_producer(topic, batching_max_publish_delay_ms="test"))
client.close()
def test_consumer_argument_errors(self):
client = Client(self.serviceUrl)
topic = "my-python-test-producer"
sub_name = "my-sub-name"
self._check_value_error(lambda: client.subscribe(None, sub_name))
self._check_value_error(lambda: client.subscribe(topic, None))
self._check_value_error(lambda: client.subscribe(topic, sub_name, consumer_type=None))
self._check_value_error(lambda: client.subscribe(topic, sub_name, receiver_queue_size="test"))
self._check_value_error(lambda: client.subscribe(topic, sub_name, consumer_name=5))
self._check_value_error(lambda: client.subscribe(topic, sub_name, unacked_messages_timeout_ms="test"))
self._check_value_error(lambda: client.subscribe(topic, sub_name, broker_consumer_stats_cache_time_ms="test"))
client.close()
def test_reader_argument_errors(self):
client = Client(self.serviceUrl)
topic = "my-python-test-producer"
# This should not raise exception
client.create_reader(topic, MessageId.earliest)
self._check_value_error(lambda: client.create_reader(None, MessageId.earliest))
self._check_value_error(lambda: client.create_reader(topic, None))
self._check_value_error(lambda: client.create_reader(topic, MessageId.earliest, receiver_queue_size="test"))
self._check_value_error(lambda: client.create_reader(topic, MessageId.earliest, reader_name=5))
client.close()
def test_get_last_message_id(self):
client = Client(self.serviceUrl)
consumer = client.subscribe(
"persistent://public/default/topic_name_test", "topic_name_test_sub", consumer_type=ConsumerType.Shared
)
producer = client.create_producer("persistent://public/default/topic_name_test")
msg_id = producer.send(b"hello")
msg = consumer.receive(TM)
self.assertEqual(msg.message_id(), msg_id)
client.close()
def test_publish_compact_and_consume(self):
client = Client(self.serviceUrl)
topic = "compaction_%s" % (uuid.uuid4())
producer = client.create_producer(topic, producer_name="my-producer-name", batching_enabled=False)
self.assertEqual(producer.last_sequence_id(), -1)
consumer = client.subscribe(topic, "my-sub1", is_read_compacted=True)
consumer.close()
consumer2 = client.subscribe(topic, "my-sub2", is_read_compacted=False)
# producer create 2 messages with same key.
producer.send(b"hello-0", partition_key="key0")
producer.send(b"hello-1", partition_key="key0")
producer.close()
# issue compact command, and wait success
url = "%s/admin/v2/persistent/public/default/%s/compaction" % (self.adminUrl, topic)
doHttpPut(url, "")
while True:
s = doHttpGet(url).decode("utf-8")
if "RUNNING" in s:
print(s)
print("Compact still running")
time.sleep(0.2)
else:
print(s)
print("Compact Complete now")
self.assertTrue("SUCCESS" in s)
break
# after compaction completes the compacted ledger is recorded
# as a property of a cursor. As persisting the cursor is async
# and we don't wait for the acknowledgement of the acknowledgement,
# there may be a race if we try to read the compacted ledger immediately.
# therefore wait a second to allow the compacted ledger to be updated on
# the broker.
time.sleep(1.0)
# after compact, consumer with `is_read_compacted=True`, expected read only the second message for same key.
consumer1 = client.subscribe(topic, "my-sub1", is_read_compacted=True)
msg0 = consumer1.receive(TM)
self.assertEqual(msg0.data(), b"hello-1")
consumer1.acknowledge(msg0)
consumer1.close()
# ditto for reader
reader1 = client.create_reader(topic, MessageId.earliest, is_read_compacted=True)
msg0 = reader1.read_next(TM)
self.assertEqual(msg0.data(), b"hello-1")
reader1.close()
# after compact, consumer with `is_read_compacted=False`, expected read 2 messages for same key.
msg0 = consumer2.receive(TM)
self.assertEqual(msg0.data(), b"hello-0")
consumer2.acknowledge(msg0)
msg1 = consumer2.receive(TM)
self.assertEqual(msg1.data(), b"hello-1")
consumer2.acknowledge(msg1)
consumer2.close()
# ditto for reader
reader2 = client.create_reader(topic, MessageId.earliest, is_read_compacted=False)
msg0 = reader2.read_next(TM)
self.assertEqual(msg0.data(), b"hello-0")
msg1 = reader2.read_next(TM)
self.assertEqual(msg1.data(), b"hello-1")
reader2.close()
client.close()
def test_reader_has_message_available(self):
# create client, producer, reader
client = Client(self.serviceUrl)
producer = client.create_producer("my-python-topic-reader-has-message-available")
reader = client.create_reader("my-python-topic-reader-has-message-available", MessageId.latest)
# before produce data, expected not has message available
self.assertFalse(reader.has_message_available())
for i in range(10):
producer.send(b"hello-%d" % i)
# produced data, expected has message available
self.assertTrue(reader.has_message_available())
for i in range(10):
msg = reader.read_next(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello-%d" % i)
# consumed all data, expected not has message available
self.assertFalse(reader.has_message_available())
for i in range(10, 20):
producer.send(b"hello-%d" % i)
# produced data again, expected has message available
self.assertTrue(reader.has_message_available())
reader.close()
producer.close()
client.close()
def test_seek(self):
client = Client(self.serviceUrl)
topic = "my-python-topic-seek-" + str(time.time())
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
producer = client.create_producer(topic)
for i in range(100):
if i > 0:
time.sleep(0.02)
producer.send(b"hello-%d" % i)
ids = []
timestamps = []
for i in range(100):
msg = consumer.receive(TM)
self.assertEqual(msg.data(), b"hello-%d" % i)
ids.append(msg.message_id())
timestamps.append(msg.publish_timestamp())
consumer.acknowledge(msg)
# seek, and after reconnect, expected receive first message.
consumer.seek(MessageId.earliest)
msg = consumer.receive(TM)
self.assertEqual(msg.data(), b"hello-0")
# seek with wrong type
with self.assertRaises(ValueError, msg="invalid seek_arg type <class 'float'>"):
consumer.seek(1.0)
# seek on messageId
consumer.seek(ids[50])
msg = consumer.receive(TM)
self.assertEqual(msg.data(), b"hello-51")
# seek on a user provided MessageId
msg_id = MessageId(ledger_id=ids[60].ledger_id(),
entry_id=ids[60].entry_id())
consumer.seek(msg_id)
msg = consumer.receive(TM)
self.assertEqual(msg.data(), b"hello-61")
# ditto, but seek on timestamp
consumer.seek(timestamps[42])
msg = consumer.receive(TM)
self.assertEqual(msg.data(), b"hello-42")
# repeat with reader
reader = client.create_reader(topic, MessageId.latest)
with self.assertRaises(pulsar.Timeout):
reader.read_next(100)
# seek with wrong type
with self.assertRaises(ValueError, msg="invalid seek_arg type <class 'float'>"):
consumer.seek(1.0)
# earliest
reader.seek(MessageId.earliest)
msg = reader.read_next(TM)
self.assertEqual(msg.data(), b"hello-0")
msg = reader.read_next(TM)
self.assertEqual(msg.data(), b"hello-1")
# seek on messageId
reader.seek(ids[33])
msg = reader.read_next(TM)
self.assertEqual(msg.data(), b"hello-34")
msg = reader.read_next(TM)
self.assertEqual(msg.data(), b"hello-35")
# seek on a user provided MessageId
msg_id = MessageId(ledger_id=ids[44].ledger_id(),
entry_id=ids[44].entry_id())
reader.seek(msg_id)
msg = reader.read_next(TM)
self.assertEqual(msg.data(), b"hello-45")
# seek on timestamp
reader.seek(timestamps[79])
msg = reader.read_next(TM)
self.assertEqual(msg.data(), b"hello-79")
msg = reader.read_next(TM)
self.assertEqual(msg.data(), b"hello-80")
reader.close()
client.close()
def test_seek_inclusive(self):
client = Client(self.serviceUrl)
topic = "my-python-topic-seek-inclusive-" + str(time.time())
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared, start_message_id_inclusive=True)
producer = client.create_producer(topic)
for i in range(100):
if i > 0:
time.sleep(0.02)
producer.send(b"hello-%d" % i)
ids = []
for i in range(100):
msg = consumer.receive(TM)
self.assertEqual(msg.data(), b"hello-%d" % i)
ids.append(msg.message_id())
consumer.acknowledge(msg)
# seek, and after reconnect, expected receive first message.
consumer.seek(MessageId.earliest)
time.sleep(0.5)
msg = consumer.receive(TM)
self.assertEqual(msg.data(), b"hello-0")
# seek on messageId
consumer.seek(ids[50])
time.sleep(0.5)
msg = consumer.receive(TM)
self.assertEqual(msg.data(), b"hello-50")
client.close()
def test_v2_topics(self):
self._v2_topics(self.serviceUrl)
def test_v2_topics_http(self):
self._v2_topics(self.adminUrl)
def _v2_topics(self, url):
client = Client(url)
consumer = client.subscribe("my-v2-topic-producer-consumer", "my-sub", consumer_type=ConsumerType.Shared)
producer = client.create_producer("my-v2-topic-producer-consumer")
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
consumer.acknowledge(msg)
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
client.close()
def test_topics_consumer(self):
client = Client(self.serviceUrl)
topic1 = "persistent://public/default/my-python-topics-consumer-1"
topic2 = "persistent://public/default/my-python-topics-consumer-2"
topic3 = "persistent://public/default-2/my-python-topics-consumer-3" # topic from different namespace
topics = [topic1, topic2, topic3]
url1 = self.adminUrl + "/admin/v2/persistent/public/default/my-python-topics-consumer-1/partitions"
url2 = self.adminUrl + "/admin/v2/persistent/public/default/my-python-topics-consumer-2/partitions"
url3 = self.adminUrl + "/admin/v2/persistent/public/default-2/my-python-topics-consumer-3/partitions"
doHttpPut(url1, "2")
doHttpPut(url2, "3")
doHttpPut(url3, "4")
producer1 = client.create_producer(topic1)
producer2 = client.create_producer(topic2)
producer3 = client.create_producer(topic3)
consumer = client.subscribe(
topics, "my-topics-consumer-sub", consumer_type=ConsumerType.Shared, receiver_queue_size=10
)
for i in range(100):
producer1.send(b"hello-1-%d" % i)
for i in range(100):
producer2.send(b"hello-2-%d" % i)
for i in range(100):
producer3.send(b"hello-3-%d" % i)
for i in range(300):
msg = consumer.receive(TM)
consumer.acknowledge(msg)
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
client.close()
def test_topics_pattern_consumer(self):
client = Client(self.serviceUrl)
topics_pattern = "persistent://public/default/my-python-pattern-consumer.*"
topic1 = "persistent://public/default/my-python-pattern-consumer-1"
topic2 = "persistent://public/default/my-python-pattern-consumer-2"
topic3 = "persistent://public/default/my-python-pattern-consumer-3"
url1 = self.adminUrl + "/admin/v2/persistent/public/default/my-python-pattern-consumer-1/partitions"
url2 = self.adminUrl + "/admin/v2/persistent/public/default/my-python-pattern-consumer-2/partitions"
url3 = self.adminUrl + "/admin/v2/persistent/public/default/my-python-pattern-consumer-3/partitions"
doHttpPut(url1, "2")
doHttpPut(url2, "3")
doHttpPut(url3, "4")
producer1 = client.create_producer(topic1)
producer2 = client.create_producer(topic2)
producer3 = client.create_producer(topic3)
consumer = client.subscribe(
re.compile(topics_pattern),
"my-pattern-consumer-sub",
consumer_type=ConsumerType.Shared,
receiver_queue_size=10,
pattern_auto_discovery_period=1,
)
# wait enough time to trigger auto discovery
time.sleep(2)
for i in range(100):
producer1.send(b"hello-1-%d" % i)
for i in range(100):
producer2.send(b"hello-2-%d" % i)
for i in range(100):
producer3.send(b"hello-3-%d" % i)
for i in range(300):
msg = consumer.receive(TM)
consumer.acknowledge(msg)
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
client.close()
def test_batch_receive(self):
client = Client(self.serviceUrl)
topic = "my-python-topic-batch-receive-" + str(time.time())
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared,
start_message_id_inclusive=True, batch_receive_policy=ConsumerBatchReceivePolicy(10, -1, -1))
producer = client.create_producer(topic)
for i in range(10):
if i > 0:
time.sleep(0.02)
producer.send(b"hello-%d" % i)
msgs = consumer.batch_receive()
i = 0
for msg in msgs:
self.assertEqual(msg.data(), b"hello-%d" % i)
i += 1
client.close()
def test_message_id(self):
s = MessageId.earliest.serialize()
self.assertEqual(MessageId.deserialize(s), MessageId.earliest)
s = MessageId.latest.serialize()
self.assertEqual(MessageId.deserialize(s), MessageId.latest)
client = Client(self.serviceUrl)
topic = f'test-message-id-compare-{str(time.time())}'
producer = client.create_producer(topic)
consumer = client.subscribe(topic, 'sub')
sent_ids = []
received_ids = []
for i in range(5):
sent_ids.append(MessageId.wrap(producer.send(b'msg-%d' % i)))
msg = consumer.receive(TM)
received_ids.append(MessageId.wrap(msg.message_id()))
self.assertEqual(sent_ids[i], received_ids[i])
consumer.acknowledge(received_ids[i])
consumer.acknowledge_cumulative(received_ids[4])
for i in range(4):
self.assertLess(sent_ids[i], sent_ids[i + 1])
self.assertLessEqual(sent_ids[i], sent_ids[i + 1])
self.assertGreater(sent_ids[i + 1], sent_ids[i])
self.assertGreaterEqual(sent_ids[i + 1], sent_ids[i])
self.assertNotEqual(sent_ids[i], sent_ids[i + 1])
client.close()
def test_get_topics_partitions(self):
client = Client(self.serviceUrl)
topic_partitioned = "persistent://public/default/test_get_topics_partitions"
topic_non_partitioned = "persistent://public/default/test_get_topics_not-partitioned"
url1 = self.adminUrl + "/admin/v2/persistent/public/default/test_get_topics_partitions/partitions"
doHttpPut(url1, "3")
self.assertEqual(
client.get_topic_partitions(topic_partitioned),
[
"persistent://public/default/test_get_topics_partitions-partition-0",
"persistent://public/default/test_get_topics_partitions-partition-1",
"persistent://public/default/test_get_topics_partitions-partition-2",
],
)
self.assertEqual(client.get_topic_partitions(topic_non_partitioned), [topic_non_partitioned])
client.close()
def test_token_auth(self):
with open(TOKEN_PATH) as tf:
token = tf.read().strip()
# Use adminUrl to test both HTTP request and binary protocol
client = Client(self.adminUrl, authentication=AuthenticationToken(token))
consumer = client.subscribe(
"persistent://private/auth/my-python-topic-token-auth", "my-sub", consumer_type=ConsumerType.Shared
)
producer = client.create_producer("persistent://private/auth/my-python-topic-token-auth")
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
client.close()
def test_token_auth_supplier(self):
def read_token():
with open(TOKEN_PATH) as tf:
return tf.read().strip()
client = Client(self.serviceUrl, authentication=AuthenticationToken(read_token))
consumer = client.subscribe(
"persistent://private/auth/my-python-topic-token-auth", "my-sub", consumer_type=ConsumerType.Shared
)
producer = client.create_producer("persistent://private/auth/my-python-topic-token-auth")
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
client.close()
def test_producer_consumer_zstd(self):
client = Client(self.serviceUrl)
consumer = client.subscribe(
"my-python-topic-producer-consumer-zstd", "my-sub", consumer_type=ConsumerType.Shared
)
producer = client.create_producer(
"my-python-topic-producer-consumer-zstd", compression_type=CompressionType.ZSTD
)
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
consumer.unsubscribe()
client.close()
def test_client_reference_deleted(self):
def get_producer():
cl = Client(self.serviceUrl)
return cl.create_producer(topic="foobar")
producer = get_producer()
producer.send(b"test_payload")
#####
def test_get_topic_name(self):
client = Client(self.serviceUrl)
consumer = client.subscribe(
"persistent://public/default/topic_name_test", "topic_name_test_sub", consumer_type=ConsumerType.Shared
)
producer = client.create_producer("persistent://public/default/topic_name_test")
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertEqual(msg.topic_name(), "persistent://public/default/topic_name_test")
client.close()
def test_get_partitioned_topic_name(self):
client = Client(self.serviceUrl)
url1 = self.adminUrl + "/admin/v2/persistent/public/default/partitioned_topic_name_test/partitions"
doHttpPut(url1, "3")
partitions = [
"persistent://public/default/partitioned_topic_name_test-partition-0",
"persistent://public/default/partitioned_topic_name_test-partition-1",
"persistent://public/default/partitioned_topic_name_test-partition-2",
]
self.assertEqual(
client.get_topic_partitions("persistent://public/default/partitioned_topic_name_test"), partitions
)
consumer = client.subscribe(
"persistent://public/default/partitioned_topic_name_test",
"partitioned_topic_name_test_sub",
consumer_type=ConsumerType.Shared,
)
producer = client.create_producer("persistent://public/default/partitioned_topic_name_test")
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertTrue(msg.topic_name() in partitions)
client.close()
def test_shutdown_client(self):
client = Client(self.serviceUrl)
producer = client.create_producer("persistent://public/default/partitioned_topic_name_test")
producer.send(b"hello")
client.shutdown()
try:
producer.send(b"hello")
self.assertTrue(False)
except pulsar.PulsarException:
# Expected
pass
def test_listener_name_client(self):
client = Client(self.serviceUrl, listener_name='test')
try:
producer = client.create_producer("persistent://public/default/partitioned_topic_name_test")
self.fail()
except pulsar.PulsarException:
# Expected
pass
client.close()
def test_negative_acks(self):
client = Client(self.serviceUrl)
consumer = client.subscribe(
"test_negative_acks", "test", schema=pulsar.schema.StringSchema(), negative_ack_redelivery_delay_ms=1000
)
producer = client.create_producer("test_negative_acks", schema=pulsar.schema.StringSchema())
for i in range(10):
producer.send_async("hello-%d" % i, callback=None)
producer.flush()
for i in range(10):
msg = consumer.receive()
self.assertEqual(msg.value(), "hello-%d" % i)
consumer.negative_acknowledge(msg)
for i in range(10):
msg = consumer.receive()
self.assertEqual(msg.value(), "hello-%d" % i)
consumer.acknowledge(msg)
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
client.close()
def test_connect_timeout(self):
client = pulsar.Client(
service_url="pulsar://192.0.2.1:1234",
connection_timeout_ms=1000, # 1 second
)
t1 = time.time()
try:
producer = client.create_producer("test_connect_timeout")
self.fail("create_producer should not succeed")
except pulsar.ConnectError as expected:
print("expected error: {} when create producer".format(expected))
t2 = time.time()
self.assertGreater(t2 - t1, 1.0)
self.assertLess(t2 - t1, 1.5) # 1.5 seconds is long enough
client.close()
def test_json_schema_encode(self):
schema = JsonSchema(TestRecord)
record = TestRecord(a=1, b=2)
# Ensure that encoding a JsonSchema more than once works and produces the same result
first_encode = schema.encode(record)
second_encode = schema.encode(record)
self.assertEqual(first_encode, second_encode)
def test_logger_thread_leaks(self):
def _do_connect(close):
logger = logging.getLogger(str(threading.current_thread().ident))
logger.setLevel(logging.INFO)
client = pulsar.Client(
service_url="pulsar://localhost:6650",
io_threads=4,
message_listener_threads=4,
operation_timeout_seconds=1,
log_conf_file_path=None,
authentication=None,
logger=logger,
)
client.get_topic_partitions("persistent://public/default/partitioned_topic_name_test")
if close:
client.close()
for should_close in (True, False):
self.assertEqual(threading.active_count(), 1, "Explicit close: {}; baseline is 1 thread".format(should_close))
_do_connect(should_close)
self.assertEqual(threading.active_count(), 1, "Explicit close: {}; synchronous connect doesn't leak threads".format(should_close))
threads = []
for _ in range(10):
threads.append(threading.Thread(target=_do_connect, args=(should_close)))
threads[-1].start()
for thread in threads:
thread.join()
assert threading.active_count() == 1, "Explicit close: {}; threaded connect in parallel doesn't leak threads".format(should_close)
def test_chunking(self):
client = Client(self.serviceUrl)
data_size = 10 * 1024 * 1024
producer = client.create_producer(
'test_chunking',
chunking_enabled=True
)
consumer = client.subscribe('test_chunking', "my-subscription",
max_pending_chunked_message=10,
auto_ack_oldest_chunked_message_on_queue_full=False
)
producer.send(bytes(bytearray(os.urandom(data_size))), None)
msg = consumer.receive(TM)
self.assertEqual(len(msg.data()), data_size)
def test_invalid_chunking_config(self):
client = Client(self.serviceUrl)
self._check_value_error(lambda: client.create_producer(
'test_invalid_chunking_config',
chunking_enabled=True,
batching_enabled=True
))
def _check_value_error(self, fun):
with self.assertRaises(ValueError):
fun()
def _check_type_error(self, fun):
with self.assertRaises(TypeError):
fun()
def _test_basic_auth(self, topic_id: int, auth,
use_tls: bool = False,
tls_private_key_file_path: Optional[str] = None,
tls_certificate_file_path: Optional[str] = None) -> None:
if use_tls:
service_url = self.serviceUrlTls
tls_trust_certs_file_path = CERTS_DIR + 'cacert.pem'
else:
service_url = self.adminUrl
tls_trust_certs_file_path = None
client = Client(service_url, authentication=auth,
tls_trust_certs_file_path=tls_trust_certs_file_path,
tls_private_key_file_path=tls_private_key_file_path,
tls_certificate_file_path=tls_certificate_file_path)
topic = "persistent://private/auth/my-python-topic-basic-auth-" + str(topic_id)
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
producer = client.create_producer(topic)
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
client.close()
def test_basic_auth(self):
username = "admin"
password = "123456"
self._test_basic_auth(0, AuthenticationBasic(username, password))
self._test_basic_auth(1, AuthenticationBasic(
auth_params_string='{{"username": "{}","password": "{}"}}'.format(username, password)
))
def test_basic_auth_method(self):
username = "admin"
password = "123456"
self._test_basic_auth(2, AuthenticationBasic(username, password, 'basic'))
with self.assertRaises(pulsar.AuthorizationError):
self._test_basic_auth(3, AuthenticationBasic(username, password, 'unknown'))
self._test_basic_auth(4, AuthenticationBasic(
auth_params_string='{{"username": "{}","password": "{}", "method": "basic"}}'.format(username, password)
))
with self.assertRaises(pulsar.AuthorizationError):
self._test_basic_auth(5, AuthenticationBasic(
auth_params_string='{{"username": "{}","password": "{}", "method": "unknown"}}'.format(username, password)
))
def test_tls_encryption_with_other_auth(self):
self._test_basic_auth(6, AuthenticationBasic('admin', '123456'),
use_tls=True,
tls_private_key_file_path=CERTS_DIR + 'client-key.pem',
tls_certificate_file_path=CERTS_DIR + 'client-cert.pem')
with self.assertRaises(pulsar.ConnectError):
self._test_basic_auth(7, AuthenticationBasic('admin', '123456'),
use_tls=True,
tls_private_key_file_path=CERTS_DIR + 'client-cert.pem',
tls_certificate_file_path=CERTS_DIR + 'client-key.pem')
def test_invalid_basic_auth(self):
username = "invalid"
password = "123456"
client = Client(self.adminUrl, authentication=AuthenticationBasic(username, password))
topic = "persistent://private/auth/my-python-topic-invalid-basic-auth"
with self.assertRaises(pulsar.ConnectError):
client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
client = Client(self.adminUrl, authentication=AuthenticationBasic(
auth_params_string='{{"username": "{}","password": "{}"}}'.format(username, password)
))
with self.assertRaises(pulsar.ConnectError):
client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
with self.assertRaises(RuntimeError):
AuthenticationBasic(auth_params_string='invalid auth params')
def test_send_async_no_deadlock(self):
client = Client(self.serviceUrl)
producer = client.create_producer('test_send_async_no_deadlock')
def send_callback(res, msg):
print(f"Message '{msg}' published res={res}")
for i in range(30):
producer.send_async(f"Hello-{i}".encode('utf-8'), callback=send_callback)
producer.flush()
client.close()
def test_keyshare_policy(self):
with self.assertRaises(ValueError):
# Raise error because sticky ranges are not provided.
pulsar.ConsumerKeySharedPolicy(
key_shared_mode=pulsar.KeySharedMode.Sticky,
allow_out_of_order_delivery=False,
)
expected_key_shared_mode = pulsar.KeySharedMode.Sticky
expected_allow_out_of_order_delivery = True
expected_sticky_ranges = [(0, 100), (101,200)]
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
key_shared_mode=expected_key_shared_mode,
allow_out_of_order_delivery=expected_allow_out_of_order_delivery,
sticky_ranges=expected_sticky_ranges
)
self.assertEqual(consumer_key_shared_policy.key_shared_mode, expected_key_shared_mode)
self.assertEqual(consumer_key_shared_policy.allow_out_of_order_delivery, expected_allow_out_of_order_delivery)
self.assertEqual(consumer_key_shared_policy.sticky_ranges, expected_sticky_ranges)
def test_keyshared_invalid_sticky_ranges(self):
client = Client(self.serviceUrl)
topic = "my-python-topic-keyshare-invalid-" + str(time.time())
with self.assertRaises(ValueError):
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
key_shared_mode=pulsar.KeySharedMode.Sticky,
allow_out_of_order_delivery=False,
sticky_ranges=[(0,65536)]
)
client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared,
start_message_id_inclusive=True,
key_shared_policy=consumer_key_shared_policy)
with self.assertRaises(ValueError):
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
key_shared_mode=pulsar.KeySharedMode.Sticky,
allow_out_of_order_delivery=False,
sticky_ranges=[(0, 100), (50, 150)]
)
client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared,
start_message_id_inclusive=True,
key_shared_policy=consumer_key_shared_policy)
def test_keyshared_autosplit(self):
client = Client(self.serviceUrl)
topic = "my-python-topic-keyshare-autosplit-" + str(time.time())
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
key_shared_mode=pulsar.KeySharedMode.AutoSplit,
allow_out_of_order_delivery=True,
)
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name = 'con-1',
start_message_id_inclusive=True, key_shared_policy=consumer_key_shared_policy)
consumer2 = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name = 'con-2',
start_message_id_inclusive=True, key_shared_policy=consumer_key_shared_policy)
producer = client.create_producer(topic)
for i in range(10):
if i > 0:
time.sleep(0.02)
producer.send(b"hello-%d" % i)
msgs = []
while True:
try:
msg = consumer.receive(100)
except pulsar.Timeout:
break
msgs.append(msg)
consumer.acknowledge(msg)
while True:
try:
msg = consumer2.receive(100)
except pulsar.Timeout:
break
msgs.append(msg)
consumer2.acknowledge(msg)
self.assertEqual(len(msgs), 10)
client.close()
def test_sticky_autosplit(self):
client = Client(self.serviceUrl)
topic = "my-python-topic-keyshare-sticky-" + str(time.time())
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
key_shared_mode=pulsar.KeySharedMode.Sticky,
allow_out_of_order_delivery=True,
sticky_ranges=[(0,30000)],
)
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name='con-1',
start_message_id_inclusive=True, key_shared_policy=consumer_key_shared_policy)
consumer2_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
key_shared_mode=pulsar.KeySharedMode.Sticky,
allow_out_of_order_delivery=True,
sticky_ranges=[(30001, 65535)],
)
consumer2 = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name='con-2',
start_message_id_inclusive=True, key_shared_policy=consumer2_key_shared_policy)
producer = client.create_producer(topic)
for i in range(10):
if i > 0:
time.sleep(0.02)
producer.send(b"hello-%d" % i)
msgs = []
while True:
try:
msg = consumer.receive(100)
except pulsar.Timeout:
break
msgs.append(msg)
consumer.acknowledge(msg)
while True:
try:
msg = consumer2.receive(100)
except pulsar.Timeout:
break
msgs.append(msg)
consumer2.acknowledge(msg)
self.assertEqual(len(msgs), 10)
client.close()
def test_acknowledge_failed(self):
client = Client(self.serviceUrl)
topic = 'test_acknowledge_failed'
producer = client.create_producer(topic)
consumer1 = client.subscribe(topic, 'sub1', consumer_type=ConsumerType.Shared)
consumer2 = client.subscribe(topic, 'sub2', consumer_type=ConsumerType.KeyShared)
msg_id = producer.send('hello'.encode())
msg1 = consumer1.receive()
with self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError):
consumer1.acknowledge_cumulative(msg1)
with self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError):
consumer1.acknowledge_cumulative(msg1.message_id())
msg2 = consumer2.receive()
with self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError):
consumer2.acknowledge_cumulative(msg2)
with self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError):
consumer2.acknowledge_cumulative(msg2.message_id())
consumer = client.subscribe([topic, topic + '-another'], 'sub')
# The message id does not have a topic name
with self.assertRaises(pulsar.OperationNotSupported):
consumer.acknowledge(msg_id)
client.close()
def test_batch_index_ack(self):
topic_name = 'test-batch-index-ack-3'
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer(topic_name,
batching_enabled=True,
batching_max_messages=100,
batching_max_publish_delay_ms=10000)
consumer = client.subscribe(topic_name,
subscription_name='test-batch-index-ack',
batch_index_ack_enabled=True)
# Make sure send 0~5 is a batch msg.
for i in range(5):
producer.send_async(b"hello-%d" % i, callback=None)
producer.flush()
# Receive msgs and just ack 0, 1 msgs
results = []
for i in range(5):
msg = consumer.receive()
print("receive from {}".format(msg.message_id()))
results.append(msg)
assert len(results) == 5
for i in range(2):
consumer.acknowledge(results[i])
time.sleep(0.2)
# Restart consumer after, just receive 2~5 msg.
consumer.close()
consumer = client.subscribe(topic_name,
subscription_name='test-batch-index-ack',
batch_index_ack_enabled=True)
results2 = []
for i in range(2, 5):
msg = consumer.receive()
results2.append(msg)
assert len(results2) == 3
# assert no more msgs.
with self.assertRaises(pulsar.Timeout):
consumer.receive(timeout_millis=1000)
client.close()
def test_dead_letter_policy_config(self):
with self.assertRaises(ValueError):
ConsumerDeadLetterPolicy(-1)
policy = ConsumerDeadLetterPolicy(10)
self.assertEqual(10, policy.max_redeliver_count)
self.assertEqual("", policy.dead_letter_topic)
self.assertEqual("", policy.initial_subscription_name)
def test_dead_letter_policy(self):
client = Client(self.serviceUrl)
topic = "my-python-topic-test-dlq" + str(time.time())
dlq_topic = 'dlq-' + topic
max_redeliver_count = 5
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared,
dead_letter_policy=ConsumerDeadLetterPolicy(max_redeliver_count, dlq_topic, 'init-sub'))
dlq_consumer = client.subscribe(dlq_topic, "my-sub", consumer_type=ConsumerType.Shared)
# Sen num msgs.
producer = client.create_producer(topic)
num = 10
for i in range(num):
producer.send(b"hello-%d" % i)
producer.flush()
# Redelivery all messages maxRedeliverCountNum time.
for i in range(1, num * max_redeliver_count + num + 1):
msg = consumer.receive()
if i % num == 0:
consumer.redeliver_unacknowledged_messages()
print(f"Start redeliver msgs '{i}'")
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
for i in range(num):
msg = dlq_consumer.receive()
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello-%d" % i)
dlq_consumer.acknowledge(msg)
with self.assertRaises(pulsar.Timeout):
dlq_consumer.receive(100)
client.close()
def test_regex_subscription(self):
client = Client(self.serviceUrl)
topic1 = "persistent://public/default/test-regex-sub-1"
topic2 = "persistent://public/default/test-regex-sub-2"
topic3 = "non-persistent://public/default/test-regex-sub-3"
topic4 = "persistent://public/default/no-match-test-regex-sub-3" # no match pattern rule topic.
producer1 = client.create_producer(topic1)
producer2 = client.create_producer(topic2)
producer3 = client.create_producer(topic3)
producer4 = client.create_producer(topic4)
consumer_all = client.subscribe(
re.compile('public/default/test-regex-sub-.*'), "regex-sub-all",
consumer_type=ConsumerType.Shared, regex_subscription_mode=RegexSubscriptionMode.AllTopics
)
consumer_persistent = client.subscribe(
re.compile('public/default/test-regex-sub-.*'), "regex-sub-persistent",
consumer_type=ConsumerType.Shared, regex_subscription_mode=RegexSubscriptionMode.PersistentOnly
)
consumer_non_persistent = client.subscribe(
re.compile('public/default/test-regex-sub-.*'), "regex-sub-non-persistent",
consumer_type=ConsumerType.Shared, regex_subscription_mode=RegexSubscriptionMode.NonPersistentOnly
)
num = 10
for i in range(num):
producer1.send(b"hello-1-%d" % i)
producer2.send(b"hello-2-%d" % i)
producer3.send(b"hello-3-%d" % i)
producer4.send(b"hello-4-%d" % i)
# Assert consumer_all.
received_topics = set()
for i in range(3 * num):
msg = consumer_all.receive(TM)
topic_name = msg.topic_name()
self.assertIn(topic_name, [topic1, topic2, topic3])
received_topics.add(topic_name)
consumer_all.acknowledge(msg)
self.assertEqual(received_topics, {topic1, topic2, topic3})
with self.assertRaises(pulsar.Timeout):
consumer_all.receive(100)
# Assert consumer_persistent.
received_topics.clear()
for i in range(2 * num):
msg = consumer_persistent.receive(TM)
topic_name = msg.topic_name()
self.assertIn(topic_name, [topic1, topic2])
received_topics.add(topic_name)
consumer_persistent.acknowledge(msg)
self.assertEqual(received_topics, {topic1, topic2})
with self.assertRaises(pulsar.Timeout):
consumer_persistent.receive(100)
# Assert consumer_non_persistent.
received_topics.clear()
for i in range(num):
msg = consumer_non_persistent.receive(TM)
topic_name = msg.topic_name()
self.assertIn(topic_name, [topic3])
received_topics.add(topic_name)
consumer_non_persistent.acknowledge(msg)
self.assertEqual(received_topics, {topic3})
with self.assertRaises(pulsar.Timeout):
consumer_non_persistent.receive(100)
client.close()
def test_consumer_name(self):
client = Client(self.serviceUrl)
name = 'my-consumer-name'
consumer = client.subscribe('test_consumer_name', 'sub', consumer_name=name)
self.assertEqual(consumer.consumer_name(), name)
client.close()
if __name__ == "__main__":
main()