blob: bb62d9922f4ce630d08bb3d152f573dfa376b7f7 [file] [log] [blame]
#!/usr/bin/env python3
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import random
import threading
import logging
from unittest import TestCase, main
import time
import os
import re
import pulsar
import uuid
from datetime import timedelta
from pulsar import (
Client,
MessageId,
CompressionType,
ConsumerType,
KeySharedMode,
ConsumerKeySharedPolicy,
PartitionsRoutingMode,
AuthenticationBasic,
AuthenticationTLS,
Authentication,
AuthenticationToken,
InitialPosition,
CryptoKeyReader,
ConsumerBatchReceivePolicy,
ProducerAccessMode,
ConsumerDeadLetterPolicy,
)
from pulsar.schema import JsonSchema, Record, Integer
from _pulsar import ProducerConfiguration, ConsumerConfiguration, RegexSubscriptionMode
from schema_test import *
from reader_test import *
from urllib.request import urlopen, Request
TM = 10000 # Do not wait forever in tests
TESTS_DIR = os.path.dirname(os.path.abspath(__file__))
TOKEN_PATH = TESTS_DIR + "/.test-token.txt"
CERTS_DIR = TESTS_DIR + "/test-conf/"
def doHttpPost(url, data):
req = Request(url, data.encode())
req.add_header("Content-Type", "application/json")
urlopen(req)
def doHttpPut(url, data):
try:
req = Request(url, data.encode())
req.add_header("Content-Type", "application/json")
req.get_method = lambda: "PUT"
urlopen(req)
except Exception as ex:
# ignore conflicts exception to have test idempotency
if "409" in str(ex):
pass
else:
raise ex
def doHttpGet(url):
req = Request(url)
req.add_header("Accept", "application/json")
return urlopen(req).read()
class TestRecord(Record):
a = Integer()
b = Integer()
class PulsarTest(TestCase):
serviceUrl = "pulsar://localhost:6650"
adminUrl = "http://localhost:8080"
serviceUrlTls = "pulsar+ssl://localhost:6651"
def test_producer_config(self):
conf = ProducerConfiguration()
conf.send_timeout_millis(12)
self.assertEqual(conf.send_timeout_millis(), 12)
self.assertEqual(conf.compression_type(), CompressionType.NONE)
conf.compression_type(CompressionType.LZ4)
self.assertEqual(conf.compression_type(), CompressionType.LZ4)
conf.max_pending_messages(120)
self.assertEqual(conf.max_pending_messages(), 120)
def test_consumer_config(self):
conf = ConsumerConfiguration()
self.assertEqual(conf.consumer_type(), ConsumerType.Exclusive)
conf.consumer_type(ConsumerType.Shared)
self.assertEqual(conf.consumer_type(), ConsumerType.Shared)
self.assertEqual(conf.consumer_name(), "")
conf.consumer_name("my-name")
self.assertEqual(conf.consumer_name(), "my-name")
self.assertEqual(conf.replicate_subscription_state_enabled(), False)
conf.replicate_subscription_state_enabled(True)
self.assertEqual(conf.replicate_subscription_state_enabled(), True)
def test_connect_error(self):
with self.assertRaises(ValueError):
Client("fakeServiceUrl")
def test_exception_inheritance(self):
assert issubclass(pulsar.ConnectError, pulsar.PulsarException)
assert issubclass(pulsar.PulsarException, Exception)
def test_simple_producer(self):
client = Client(self.serviceUrl)
producer = client.create_producer("my-python-topic")
producer.send(b"hello")
producer.close()
client.close()
def test_producer_send_async(self):
client = Client(self.serviceUrl)
producer = client.create_producer("my-python-topic")
sent_messages = []
def send_callback(producer, msg):
sent_messages.append(msg)
producer.send_async(b"hello", send_callback)
producer.send_async(b"hello", send_callback)
producer.send_async(b"hello", send_callback)
i = 0
while len(sent_messages) < 3 and i < 100:
time.sleep(0.1)
i += 1
self.assertEqual(len(sent_messages), 3)
client.close()
def test_producer_send(self):
client = Client(self.serviceUrl)
topic = "test_producer_send"
producer = client.create_producer(topic)
consumer = client.subscribe(topic, "sub-name")
msg_id = producer.send(b"hello")
print("send to {}".format(msg_id))
msg = consumer.receive(TM)
consumer.acknowledge(msg)
print("receive from {}".format(msg.message_id()))
self.assertEqual(msg_id, msg.message_id())
client.close()
def test_producer_access_mode_exclusive(self):
client = Client(self.serviceUrl)
topic_name = "test-access-mode-exclusive"
client.create_producer(topic_name, producer_name="p1", access_mode=ProducerAccessMode.Exclusive)
with self.assertRaises(pulsar.ProducerFenced):
client.create_producer(topic_name, producer_name="p2", access_mode=ProducerAccessMode.Exclusive)
client.close()
def test_producer_access_mode_wait_exclusive(self):
client = Client(self.serviceUrl)
topic_name = "test_producer_access_mode_wait_exclusive"
producer1 = client.create_producer(
topic=topic_name,
producer_name='p-1',
access_mode=ProducerAccessMode.Exclusive
)
assert producer1.producer_name() == 'p-1'
# when p1 close, p2 success created.
producer1.close()
producer2 = client.create_producer(
topic=topic_name,
producer_name='p-2',
access_mode=ProducerAccessMode.WaitForExclusive
)
assert producer2.producer_name() == 'p-2'
producer2.close()
client.close()
def test_producer_access_mode_exclusive_with_fencing(self):
client = Client(self.serviceUrl)
topic_name = 'test_producer_access_mode_exclusive_with_fencing'
producer1 = client.create_producer(
topic=topic_name,
producer_name='p-1',
access_mode=ProducerAccessMode.Exclusive
)
assert producer1.producer_name() == 'p-1'
producer2 = client.create_producer(
topic=topic_name,
producer_name='p-2',
access_mode=ProducerAccessMode.ExclusiveWithFencing
)
assert producer2.producer_name() == 'p-2'
# producer1 will be fenced.
with self.assertRaises((pulsar.ProducerFenced, pulsar.AlreadyClosed)):
producer1.send('test-msg'.encode('utf-8'))
# sleep 200ms to make sure producer1 is close done.
time.sleep(0.2)
producer2.close()
client.close()
def test_producer_is_connected(self):
client = Client(self.serviceUrl)
topic = "test_producer_is_connected"
producer = client.create_producer(topic)
self.assertTrue(producer.is_connected())
producer.close()
self.assertFalse(producer.is_connected())
client.close()
def test_producer_consumer(self):
client = Client(self.serviceUrl)
consumer = client.subscribe("my-python-topic-producer-consumer", "my-sub", consumer_type=ConsumerType.Shared)
producer = client.create_producer("my-python-topic-producer-consumer")
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
consumer.unsubscribe()
client.close()
def test_ordering_key(self):
client = Client(self.serviceUrl)
consumer = client.subscribe(
"my-python-topic-ordering-key", "my-sub", consumer_type=ConsumerType.KeyShared
)
producer = client.create_producer("my-python-topic-ordering-key")
producer.send(b"ordered-hello", ordering_key="random-ordering-key")
# Message should be available immediately with ordering key set
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"ordered-hello")
self.assertEqual(msg.ordering_key(), "random-ordering-key")
consumer.unsubscribe()
producer.close()
client.close()
def test_redelivery_count(self):
client = Client(self.serviceUrl)
consumer = client.subscribe(
"my-python-topic-redelivery-count",
"my-sub",
consumer_type=ConsumerType.Shared,
negative_ack_redelivery_delay_ms=500,
)
producer = client.create_producer("my-python-topic-redelivery-count")
producer.send(b"hello")
redelivery_count = 0
for i in range(4):
msg = consumer.receive(TM)
print("Received message %s" % msg.data())
if i % 2 == 0:
consumer.negative_acknowledge(msg)
else:
consumer.negative_acknowledge(msg.message_id())
redelivery_count = msg.redelivery_count()
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
self.assertEqual(3, redelivery_count)
consumer.unsubscribe()
producer.close()
client.close()
def test_deliver_at(self):
client = Client(self.serviceUrl)
consumer = client.subscribe("my-python-topic-deliver-at", "my-sub", consumer_type=ConsumerType.Shared)
producer = client.create_producer("my-python-topic-deliver-at")
# Delay message in 1.1s
producer.send(b"hello", deliver_at=int(round(time.time() * 1000)) + 1100)
# Message should not be available in the next second
with self.assertRaises(pulsar.Timeout):
consumer.receive(1000)
# Message should be published now
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
consumer.unsubscribe()
producer.close()
client.close()
def test_deliver_after(self):
client = Client(self.serviceUrl)
consumer = client.subscribe("my-python-topic-deliver-after", "my-sub", consumer_type=ConsumerType.Shared)
producer = client.create_producer("my-python-topic-deliver-after")
# Delay message in 1.1s
producer.send(b"hello", deliver_after=timedelta(milliseconds=1100))
# Message should not be available in the next second
with self.assertRaises(pulsar.Timeout):
consumer.receive(1000)
# Message should be published in the next 500ms
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
consumer.unsubscribe()
producer.close()
client.close()
def test_consumer_initial_position(self):
client = Client(self.serviceUrl)
producer = client.create_producer("consumer-initial-position")
# Sending 5 messages before consumer creation.
# These should be received with initial_position set to Earliest but not with Latest.
for i in range(5):
producer.send(b"hello-%d" % i)
consumer = client.subscribe(
"consumer-initial-position",
"my-sub",
consumer_type=ConsumerType.Shared,
initial_position=InitialPosition.Earliest,
)
# Sending 5 other messages that should be received regardless of the initial_position.
for i in range(5, 10):
producer.send(b"hello-%d" % i)
for i in range(10):
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello-%d" % i)
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
consumer.unsubscribe()
client.close()
def test_consumer_queue_size_is_zero(self):
client = Client(self.serviceUrl)
consumer = client.subscribe(
"my-python-topic-consumer-init-queue-size-is-zero",
"my-sub",
consumer_type=ConsumerType.Shared,
receiver_queue_size=0,
initial_position=InitialPosition.Earliest,
)
producer = client.create_producer("my-python-topic-consumer-init-queue-size-is-zero")
producer.send(b"hello")
time.sleep(0.1)
msg = consumer.receive()
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
consumer.unsubscribe()
client.close()
def test_message_properties(self):
client = Client(self.serviceUrl)
topic = "my-python-test-message-properties"
consumer = client.subscribe(
topic=topic, subscription_name="my-subscription", schema=pulsar.schema.StringSchema()
)
producer = client.create_producer(topic=topic, schema=pulsar.schema.StringSchema())
producer.send("hello", properties={"a": "1", "b": "2"})
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.value(), "hello")
self.assertEqual(msg.properties(), {"a": "1", "b": "2"})
consumer.unsubscribe()
client.close()
def test_tls_auth(self):
client = Client(
self.serviceUrlTls,
tls_trust_certs_file_path=CERTS_DIR + "cacert.pem",
tls_allow_insecure_connection=False,
authentication=AuthenticationTLS(CERTS_DIR + "client-cert.pem", CERTS_DIR + "client-key.pem"),
)
topic = "my-python-topic-tls-auth-" + str(time.time())
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
producer = client.create_producer(topic)
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
client.close()
def test_tls_auth2(self):
authPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationTls"
authParams = "tlsCertFile:%s/client-cert.pem,tlsKeyFile:%s/client-key.pem" % (CERTS_DIR, CERTS_DIR)
client = Client(
self.serviceUrlTls,
tls_trust_certs_file_path=CERTS_DIR + "cacert.pem",
tls_allow_insecure_connection=False,
authentication=Authentication(authPlugin, authParams),
)
topic = "my-python-topic-tls-auth-2-" + str(time.time())
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
producer = client.create_producer(topic)
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
client.close()
def test_encryption(self):
publicKeyPath = CERTS_DIR + "public-key.client-rsa.pem"
privateKeyPath = CERTS_DIR + "private-key.client-rsa.pem"
crypto_key_reader = CryptoKeyReader(publicKeyPath, privateKeyPath)
client = Client(self.serviceUrl)
topic = "my-python-test-end-to-end-encryption"
consumer = client.subscribe(
topic=topic, subscription_name="my-subscription", crypto_key_reader=crypto_key_reader
)
producer = client.create_producer(
topic=topic, encryption_key="client-rsa.pem", crypto_key_reader=crypto_key_reader
)
reader = client.create_reader(
topic=topic, start_message_id=MessageId.earliest, crypto_key_reader=crypto_key_reader
)
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.value(), b"hello")
consumer.unsubscribe()
msg = reader.read_next(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
with self.assertRaises(pulsar.Timeout):
reader.read_next(100)
reader.close()
client.close()
def test_tls_auth3(self):
authPlugin = "tls"
authParams = "tlsCertFile:%s/client-cert.pem,tlsKeyFile:%s/client-key.pem" % (CERTS_DIR, CERTS_DIR)
client = Client(
self.serviceUrlTls,
tls_trust_certs_file_path=CERTS_DIR + "cacert.pem",
tls_allow_insecure_connection=False,
authentication=Authentication(authPlugin, authParams),
)
topic = "my-python-topic-tls-auth-3-" + str(time.time())
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
producer = client.create_producer(topic)
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
client.close()
def test_auth_junk_params(self):
authPlugin = "someoldjunk.so"
authParams = "blah"
client = Client(
self.serviceUrlTls,
tls_trust_certs_file_path=CERTS_DIR + "cacert.pem",
tls_allow_insecure_connection=False,
authentication=Authentication(authPlugin, authParams),
)
with self.assertRaises(pulsar.AuthenticationError):
client.subscribe("my-python-topic-auth-junk-params", "my-sub", consumer_type=ConsumerType.Shared)
def test_message_listener(self):
client = Client(self.serviceUrl)
received_messages = []
def listener(consumer, msg):
print("Got message: %s" % msg)
received_messages.append(msg)
consumer.acknowledge(msg)
client.subscribe(
"my-python-topic-listener", "my-sub", consumer_type=ConsumerType.Exclusive, message_listener=listener
)
producer = client.create_producer("my-python-topic-listener")
producer.send(b"hello-1")
producer.send(b"hello-2")
producer.send(b"hello-3")
time.sleep(0.1)
self.assertEqual(len(received_messages), 3)
self.assertEqual(received_messages[0].data(), b"hello-1")
self.assertEqual(received_messages[1].data(), b"hello-2")
self.assertEqual(received_messages[2].data(), b"hello-3")
client.close()
def test_consumer_is_connected(self):
client = Client(self.serviceUrl)
topic = "test_consumer_is_connected"
sub = "sub"
consumer = client.subscribe(topic, sub)
self.assertTrue(consumer.is_connected())
consumer.close()
self.assertFalse(consumer.is_connected())
client.close()
def test_reader_simple(self):
client = Client(self.serviceUrl)
reader = client.create_reader("my-python-topic-reader-simple", MessageId.earliest)
producer = client.create_producer("my-python-topic-reader-simple")
producer.send(b"hello")
msg = reader.read_next(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
with self.assertRaises(pulsar.Timeout):
reader.read_next(100)
reader.close()
client.close()
def test_reader_on_last_message(self):
client = Client(self.serviceUrl)
producer = client.create_producer("my-python-topic-reader-on-last-message")
for i in range(10):
producer.send(b"hello-%d" % i)
reader = client.create_reader("my-python-topic-reader-on-last-message", MessageId.latest)
for i in range(10, 20):
producer.send(b"hello-%d" % i)
for i in range(10, 20):
msg = reader.read_next(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello-%d" % i)
reader.close()
client.close()
def test_reader_on_specific_message(self):
num_of_msgs = 10
client = Client(self.serviceUrl)
producer = client.create_producer("my-python-topic-reader-on-specific-message")
for i in range(num_of_msgs):
producer.send(b"hello-%d" % i)
reader1 = client.create_reader("my-python-topic-reader-on-specific-message", MessageId.earliest)
for i in range(num_of_msgs // 2):
msg = reader1.read_next(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello-%d" % i)
last_msg_id = msg.message_id()
last_msg_idx = i
reader2 = client.create_reader("my-python-topic-reader-on-specific-message", last_msg_id)
# The reset would be effectively done on the next position relative to reset.
# When available, we should test this behaviour with `startMessageIdInclusive` opt.
from_msg_idx = last_msg_idx
for i in range(from_msg_idx + 1, num_of_msgs):
msg = reader2.read_next(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello-%d" % i)
reader1.close()
reader2.close()
client.close()
def test_reader_on_specific_message_with_batches(self):
client = Client(self.serviceUrl)
producer = client.create_producer(
"my-python-topic-reader-on-specific-message-with-batches",
batching_enabled=True,
batching_max_publish_delay_ms=1000,
)
for i in range(10):
producer.send_async(b"hello-%d" % i, None)
# Send one sync message to make sure everything was published
producer.send(b"hello-10")
reader1 = client.create_reader("my-python-topic-reader-on-specific-message-with-batches", MessageId.earliest)
for i in range(5):
msg = reader1.read_next(TM)
last_msg_id = msg.message_id()
reader2 = client.create_reader("my-python-topic-reader-on-specific-message-with-batches", last_msg_id)
for i in range(5, 11):
msg = reader2.read_next(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello-%d" % i)
reader1.close()
reader2.close()
client.close()
def test_reader_on_partitioned_topic(self):
num_of_msgs = 100
topic_name = "public/default/my-python-topic-test_reader_on_partitioned_topic"
url1 = self.adminUrl + "/admin/v2/persistent/" + topic_name + "/partitions"
doHttpPut(url1, "4")
client = Client(self.serviceUrl)
producer = client.create_producer(topic_name)
send_array = []
for i in range(num_of_msgs):
data = b"hello-%d" % i
producer.send(data)
send_array.append(data)
reader = client.create_reader(topic_name, MessageId.earliest)
read_array = []
for i in range(num_of_msgs):
msg = reader.read_next(TM)
self.assertTrue(msg)
read_array.append(msg.data())
self.assertListEqual(sorted(send_array), sorted(read_array))
reader.close()
client.close()
def test_reader_is_connected(self):
client = Client(self.serviceUrl)
topic = "test_reader_is_connected"
reader = client.create_reader(topic, MessageId.earliest)
self.assertTrue(reader.is_connected())
reader.close()
self.assertFalse(reader.is_connected())
client.close()
def test_reader_seek_for_message_id(self):
client = pulsar.Client(self.serviceUrl)
topic = "test-seek-for-message-id-" + str(int(time.time()))
producer = client.create_producer(topic)
readerExclusive = client.create_reader(topic, MessageId.latest)
readerInclusive = client.create_reader(topic, MessageId.latest, start_message_id_inclusive=True)
numMessages = 100
seekMessageId = None
r = random.randint(0, numMessages - 2)
for i in range(numMessages):
msg_content = b"msg-%d" % i
id = producer.send(msg_content)
if i == r:
seekMessageId = id
readerExclusive.seek(seekMessageId)
msg0 = readerExclusive.read_next(timeout_millis=3000)
readerInclusive.seek(seekMessageId)
msg1 = readerInclusive.read_next(timeout_millis=3000)
self.assertEqual(msg0.data(), b"msg-%d" % (r + 1))
self.assertEqual(msg1.data(), b"msg-%d" % r)
readerExclusive.close()
readerInclusive.close()
producer.close()
client.close()
def test_producer_sequence_after_reconnection(self):
# Enable deduplication on namespace
doHttpPost(self.adminUrl + "/admin/v2/namespaces/public/default/deduplication", "true")
client = Client(self.serviceUrl)
topic = "my-python-test-producer-sequence-after-reconnection-" + str(time.time())
producer = client.create_producer(topic, producer_name="my-producer-name")
self.assertEqual(producer.last_sequence_id(), -1)
for i in range(10):
producer.send(b"hello-%d" % i)
self.assertEqual(producer.last_sequence_id(), i)
producer.close()
producer = client.create_producer(topic, producer_name="my-producer-name")
self.assertEqual(producer.last_sequence_id(), 9)
for i in range(10, 20):
producer.send(b"hello-%d" % i)
self.assertEqual(producer.last_sequence_id(), i)
client.close()
doHttpPost(self.adminUrl + "/admin/v2/namespaces/public/default/deduplication", "false")
def test_producer_deduplication(self):
# Enable deduplication on namespace
doHttpPost(self.adminUrl + "/admin/v2/namespaces/public/default/deduplication", "true")
client = Client(self.serviceUrl)
topic = "my-python-test-producer-deduplication-" + str(time.time())
producer = client.create_producer(topic, producer_name="my-producer-name")
self.assertEqual(producer.last_sequence_id(), -1)
consumer = client.subscribe(topic, "my-sub")
producer.send(b"hello-0", sequence_id=0)
producer.send(b"hello-1", sequence_id=1)
producer.send(b"hello-2", sequence_id=2)
self.assertEqual(producer.last_sequence_id(), 2)
# Repeat the messages and verify they're not received by consumer
producer.send(b"hello-1", sequence_id=1)
producer.send(b"hello-2", sequence_id=2)
self.assertEqual(producer.last_sequence_id(), 2)
for i in range(3):
msg = consumer.receive(TM)
self.assertEqual(msg.data(), b"hello-%d" % i)
consumer.acknowledge(msg)
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
producer.close()
producer = client.create_producer(topic, producer_name="my-producer-name")
self.assertEqual(producer.last_sequence_id(), 2)
# Repeat the messages and verify they're not received by consumer
producer.send(b"hello-1", sequence_id=1)
producer.send(b"hello-2", sequence_id=2)
self.assertEqual(producer.last_sequence_id(), 2)
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
client.close()
doHttpPost(self.adminUrl + "/admin/v2/namespaces/public/default/deduplication", "false")
def test_producer_routing_mode(self):
client = Client(self.serviceUrl)
producer = client.create_producer(
"my-python-test-producer", message_routing_mode=PartitionsRoutingMode.UseSinglePartition
)
producer.send(b"test")
client.close()
def test_message_argument_errors(self):
client = Client(self.serviceUrl)
topic = "my-python-test-producer"
producer = client.create_producer(topic)
content = "test".encode("utf-8")
self._check_type_error(lambda: producer.send(5))
self._check_value_error(lambda: producer.send(content, properties="test"))
self._check_value_error(lambda: producer.send(content, partition_key=5))
self._check_value_error(lambda: producer.send(content, sequence_id="test"))
self._check_value_error(lambda: producer.send(content, replication_clusters=5))
self._check_value_error(lambda: producer.send(content, disable_replication="test"))
self._check_value_error(lambda: producer.send(content, event_timestamp="test"))
self._check_value_error(lambda: producer.send(content, deliver_at="test"))
self._check_value_error(lambda: producer.send(content, deliver_after="test"))
client.close()
def test_client_argument_errors(self):
self._check_value_error(lambda: Client(None))
self._check_value_error(lambda: Client(self.serviceUrl, authentication="test"))
self._check_value_error(lambda: Client(self.serviceUrl, operation_timeout_seconds="test"))
self._check_value_error(lambda: Client(self.serviceUrl, io_threads="test"))
self._check_value_error(lambda: Client(self.serviceUrl, message_listener_threads="test"))
self._check_value_error(lambda: Client(self.serviceUrl, concurrent_lookup_requests="test"))
self._check_value_error(lambda: Client(self.serviceUrl, use_tls="test"))
self._check_value_error(lambda: Client(self.serviceUrl, tls_trust_certs_file_path=5))
self._check_value_error(lambda: Client(self.serviceUrl, tls_allow_insecure_connection="test"))
def test_producer_argument_errors(self):
client = Client(self.serviceUrl)
self._check_value_error(lambda: client.create_producer(None))
topic = "my-python-test-producer"
self._check_value_error(lambda: client.create_producer(topic, producer_name=5))
self._check_value_error(lambda: client.create_producer(topic, initial_sequence_id="test"))
self._check_value_error(lambda: client.create_producer(topic, send_timeout_millis="test"))
self._check_value_error(lambda: client.create_producer(topic, compression_type=None))
self._check_value_error(lambda: client.create_producer(topic, max_pending_messages="test"))
self._check_value_error(lambda: client.create_producer(topic, block_if_queue_full="test"))
self._check_value_error(lambda: client.create_producer(topic, batching_enabled="test"))
self._check_value_error(lambda: client.create_producer(topic, batching_enabled="test"))
self._check_value_error(lambda: client.create_producer(topic, batching_max_allowed_size_in_bytes="test"))
self._check_value_error(lambda: client.create_producer(topic, batching_max_publish_delay_ms="test"))
client.close()
def test_consumer_argument_errors(self):
client = Client(self.serviceUrl)
topic = "my-python-test-producer"
sub_name = "my-sub-name"
self._check_value_error(lambda: client.subscribe(None, sub_name))
self._check_value_error(lambda: client.subscribe(topic, None))
self._check_value_error(lambda: client.subscribe(topic, sub_name, consumer_type=None))
self._check_value_error(lambda: client.subscribe(topic, sub_name, receiver_queue_size="test"))
self._check_value_error(lambda: client.subscribe(topic, sub_name, consumer_name=5))
self._check_value_error(lambda: client.subscribe(topic, sub_name, unacked_messages_timeout_ms="test"))
self._check_value_error(lambda: client.subscribe(topic, sub_name, broker_consumer_stats_cache_time_ms="test"))
client.close()
def test_reader_argument_errors(self):
client = Client(self.serviceUrl)
topic = "my-python-test-producer"
# This should not raise exception
client.create_reader(topic, MessageId.earliest)
self._check_value_error(lambda: client.create_reader(None, MessageId.earliest))
self._check_value_error(lambda: client.create_reader(topic, None))
self._check_value_error(lambda: client.create_reader(topic, MessageId.earliest, receiver_queue_size="test"))
self._check_value_error(lambda: client.create_reader(topic, MessageId.earliest, reader_name=5))
client.close()
def test_get_last_message_id(self):
client = Client(self.serviceUrl)
consumer = client.subscribe(
"persistent://public/default/topic_name_test", "topic_name_test_sub", consumer_type=ConsumerType.Shared
)
producer = client.create_producer("persistent://public/default/topic_name_test")
msg_id = producer.send(b"hello")
msg = consumer.receive(TM)
self.assertEqual(msg.message_id(), msg_id)
client.close()
def test_publish_compact_and_consume(self):
client = Client(self.serviceUrl)
topic = "compaction_%s" % (uuid.uuid4())
producer = client.create_producer(topic, producer_name="my-producer-name", batching_enabled=False)
self.assertEqual(producer.last_sequence_id(), -1)
consumer = client.subscribe(topic, "my-sub1", is_read_compacted=True)
consumer.close()
consumer2 = client.subscribe(topic, "my-sub2", is_read_compacted=False)
# producer create 2 messages with same key.
producer.send(b"hello-0", partition_key="key0")
producer.send(b"hello-1", partition_key="key0")
producer.close()
# issue compact command, and wait success
url = "%s/admin/v2/persistent/public/default/%s/compaction" % (self.adminUrl, topic)
doHttpPut(url, "")
while True:
s = doHttpGet(url).decode("utf-8")
if "RUNNING" in s:
print(s)
print("Compact still running")
time.sleep(0.2)
else:
print(s)
print("Compact Complete now")
self.assertTrue("SUCCESS" in s)
break
# after compaction completes the compacted ledger is recorded
# as a property of a cursor. As persisting the cursor is async
# and we don't wait for the acknowledgement of the acknowledgement,
# there may be a race if we try to read the compacted ledger immediately.
# therefore wait a second to allow the compacted ledger to be updated on
# the broker.
time.sleep(1.0)
# after compact, consumer with `is_read_compacted=True`, expected read only the second message for same key.
consumer1 = client.subscribe(topic, "my-sub1", is_read_compacted=True)
msg0 = consumer1.receive(TM)
self.assertEqual(msg0.data(), b"hello-1")
consumer1.acknowledge(msg0)
consumer1.close()
# ditto for reader
reader1 = client.create_reader(topic, MessageId.earliest, is_read_compacted=True)
msg0 = reader1.read_next(TM)
self.assertEqual(msg0.data(), b"hello-1")
reader1.close()
# after compact, consumer with `is_read_compacted=False`, expected read 2 messages for same key.
msg0 = consumer2.receive(TM)
self.assertEqual(msg0.data(), b"hello-0")
consumer2.acknowledge(msg0)
msg1 = consumer2.receive(TM)
self.assertEqual(msg1.data(), b"hello-1")
consumer2.acknowledge(msg1)
consumer2.close()
# ditto for reader
reader2 = client.create_reader(topic, MessageId.earliest, is_read_compacted=False)
msg0 = reader2.read_next(TM)
self.assertEqual(msg0.data(), b"hello-0")
msg1 = reader2.read_next(TM)
self.assertEqual(msg1.data(), b"hello-1")
reader2.close()
client.close()
def test_reader_has_message_available(self):
# create client, producer, reader
client = Client(self.serviceUrl)
producer = client.create_producer("my-python-topic-reader-has-message-available")
reader = client.create_reader("my-python-topic-reader-has-message-available", MessageId.latest)
# before produce data, expected not has message available
self.assertFalse(reader.has_message_available())
for i in range(10):
producer.send(b"hello-%d" % i)
# produced data, expected has message available
self.assertTrue(reader.has_message_available())
for i in range(10):
msg = reader.read_next(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello-%d" % i)
# consumed all data, expected not has message available
self.assertFalse(reader.has_message_available())
for i in range(10, 20):
producer.send(b"hello-%d" % i)
# produced data again, expected has message available
self.assertTrue(reader.has_message_available())
reader.close()
producer.close()
client.close()
def test_seek(self):
client = Client(self.serviceUrl)
topic = "my-python-topic-seek-" + str(time.time())
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
producer = client.create_producer(topic)
for i in range(100):
if i > 0:
time.sleep(0.02)
producer.send(b"hello-%d" % i)
ids = []
timestamps = []
for i in range(100):
msg = consumer.receive(TM)
self.assertEqual(msg.data(), b"hello-%d" % i)
ids.append(msg.message_id())
timestamps.append(msg.publish_timestamp())
consumer.acknowledge(msg)
# seek, and after reconnect, expected receive first message.
consumer.seek(MessageId.earliest)
msg = consumer.receive(TM)
self.assertEqual(msg.data(), b"hello-0")
# seek on messageId
consumer.seek(ids[50])
msg = consumer.receive(TM)
self.assertEqual(msg.data(), b"hello-51")
# ditto, but seek on timestamp
consumer.seek(timestamps[42])
msg = consumer.receive(TM)
self.assertEqual(msg.data(), b"hello-42")
# repeat with reader
reader = client.create_reader(topic, MessageId.latest)
with self.assertRaises(pulsar.Timeout):
reader.read_next(100)
# earliest
reader.seek(MessageId.earliest)
msg = reader.read_next(TM)
self.assertEqual(msg.data(), b"hello-0")
msg = reader.read_next(TM)
self.assertEqual(msg.data(), b"hello-1")
# seek on messageId
reader.seek(ids[33])
msg = reader.read_next(TM)
self.assertEqual(msg.data(), b"hello-34")
msg = reader.read_next(TM)
self.assertEqual(msg.data(), b"hello-35")
# seek on timestamp
reader.seek(timestamps[79])
msg = reader.read_next(TM)
self.assertEqual(msg.data(), b"hello-79")
msg = reader.read_next(TM)
self.assertEqual(msg.data(), b"hello-80")
reader.close()
client.close()
def test_seek_inclusive(self):
client = Client(self.serviceUrl)
topic = "my-python-topic-seek-inclusive-" + str(time.time())
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared, start_message_id_inclusive=True)
producer = client.create_producer(topic)
for i in range(100):
if i > 0:
time.sleep(0.02)
producer.send(b"hello-%d" % i)
ids = []
for i in range(100):
msg = consumer.receive(TM)
self.assertEqual(msg.data(), b"hello-%d" % i)
ids.append(msg.message_id())
consumer.acknowledge(msg)
# seek, and after reconnect, expected receive first message.
consumer.seek(MessageId.earliest)
time.sleep(0.5)
msg = consumer.receive(TM)
self.assertEqual(msg.data(), b"hello-0")
# seek on messageId
consumer.seek(ids[50])
time.sleep(0.5)
msg = consumer.receive(TM)
self.assertEqual(msg.data(), b"hello-50")
client.close()
def test_v2_topics(self):
self._v2_topics(self.serviceUrl)
def test_v2_topics_http(self):
self._v2_topics(self.adminUrl)
def _v2_topics(self, url):
client = Client(url)
consumer = client.subscribe("my-v2-topic-producer-consumer", "my-sub", consumer_type=ConsumerType.Shared)
producer = client.create_producer("my-v2-topic-producer-consumer")
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
consumer.acknowledge(msg)
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
client.close()
def test_topics_consumer(self):
client = Client(self.serviceUrl)
topic1 = "persistent://public/default/my-python-topics-consumer-1"
topic2 = "persistent://public/default/my-python-topics-consumer-2"
topic3 = "persistent://public/default-2/my-python-topics-consumer-3" # topic from different namespace
topics = [topic1, topic2, topic3]
url1 = self.adminUrl + "/admin/v2/persistent/public/default/my-python-topics-consumer-1/partitions"
url2 = self.adminUrl + "/admin/v2/persistent/public/default/my-python-topics-consumer-2/partitions"
url3 = self.adminUrl + "/admin/v2/persistent/public/default-2/my-python-topics-consumer-3/partitions"
doHttpPut(url1, "2")
doHttpPut(url2, "3")
doHttpPut(url3, "4")
producer1 = client.create_producer(topic1)
producer2 = client.create_producer(topic2)
producer3 = client.create_producer(topic3)
consumer = client.subscribe(
topics, "my-topics-consumer-sub", consumer_type=ConsumerType.Shared, receiver_queue_size=10
)
for i in range(100):
producer1.send(b"hello-1-%d" % i)
for i in range(100):
producer2.send(b"hello-2-%d" % i)
for i in range(100):
producer3.send(b"hello-3-%d" % i)
for i in range(300):
msg = consumer.receive(TM)
consumer.acknowledge(msg)
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
client.close()
def test_topics_pattern_consumer(self):
client = Client(self.serviceUrl)
topics_pattern = "persistent://public/default/my-python-pattern-consumer.*"
topic1 = "persistent://public/default/my-python-pattern-consumer-1"
topic2 = "persistent://public/default/my-python-pattern-consumer-2"
topic3 = "persistent://public/default/my-python-pattern-consumer-3"
url1 = self.adminUrl + "/admin/v2/persistent/public/default/my-python-pattern-consumer-1/partitions"
url2 = self.adminUrl + "/admin/v2/persistent/public/default/my-python-pattern-consumer-2/partitions"
url3 = self.adminUrl + "/admin/v2/persistent/public/default/my-python-pattern-consumer-3/partitions"
doHttpPut(url1, "2")
doHttpPut(url2, "3")
doHttpPut(url3, "4")
producer1 = client.create_producer(topic1)
producer2 = client.create_producer(topic2)
producer3 = client.create_producer(topic3)
consumer = client.subscribe(
re.compile(topics_pattern),
"my-pattern-consumer-sub",
consumer_type=ConsumerType.Shared,
receiver_queue_size=10,
pattern_auto_discovery_period=1,
)
# wait enough time to trigger auto discovery
time.sleep(2)
for i in range(100):
producer1.send(b"hello-1-%d" % i)
for i in range(100):
producer2.send(b"hello-2-%d" % i)
for i in range(100):
producer3.send(b"hello-3-%d" % i)
for i in range(300):
msg = consumer.receive(TM)
consumer.acknowledge(msg)
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
client.close()
def test_batch_receive(self):
client = Client(self.serviceUrl)
topic = "my-python-topic-batch-receive-" + str(time.time())
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared,
start_message_id_inclusive=True, batch_receive_policy=ConsumerBatchReceivePolicy(10, -1, -1))
producer = client.create_producer(topic)
for i in range(10):
if i > 0:
time.sleep(0.02)
producer.send(b"hello-%d" % i)
msgs = consumer.batch_receive()
i = 0
for msg in msgs:
self.assertEqual(msg.data(), b"hello-%d" % i)
i += 1
client.close()
def test_message_id(self):
s = MessageId.earliest.serialize()
self.assertEqual(MessageId.deserialize(s), MessageId.earliest)
s = MessageId.latest.serialize()
self.assertEqual(MessageId.deserialize(s), MessageId.latest)
def test_get_topics_partitions(self):
client = Client(self.serviceUrl)
topic_partitioned = "persistent://public/default/test_get_topics_partitions"
topic_non_partitioned = "persistent://public/default/test_get_topics_not-partitioned"
url1 = self.adminUrl + "/admin/v2/persistent/public/default/test_get_topics_partitions/partitions"
doHttpPut(url1, "3")
self.assertEqual(
client.get_topic_partitions(topic_partitioned),
[
"persistent://public/default/test_get_topics_partitions-partition-0",
"persistent://public/default/test_get_topics_partitions-partition-1",
"persistent://public/default/test_get_topics_partitions-partition-2",
],
)
self.assertEqual(client.get_topic_partitions(topic_non_partitioned), [topic_non_partitioned])
client.close()
def test_token_auth(self):
with open(TOKEN_PATH) as tf:
token = tf.read().strip()
# Use adminUrl to test both HTTP request and binary protocol
client = Client(self.adminUrl, authentication=AuthenticationToken(token))
consumer = client.subscribe(
"persistent://private/auth/my-python-topic-token-auth", "my-sub", consumer_type=ConsumerType.Shared
)
producer = client.create_producer("persistent://private/auth/my-python-topic-token-auth")
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
client.close()
def test_token_auth_supplier(self):
def read_token():
with open(TOKEN_PATH) as tf:
return tf.read().strip()
client = Client(self.serviceUrl, authentication=AuthenticationToken(read_token))
consumer = client.subscribe(
"persistent://private/auth/my-python-topic-token-auth", "my-sub", consumer_type=ConsumerType.Shared
)
producer = client.create_producer("persistent://private/auth/my-python-topic-token-auth")
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
client.close()
def test_producer_consumer_zstd(self):
client = Client(self.serviceUrl)
consumer = client.subscribe(
"my-python-topic-producer-consumer-zstd", "my-sub", consumer_type=ConsumerType.Shared
)
producer = client.create_producer(
"my-python-topic-producer-consumer-zstd", compression_type=CompressionType.ZSTD
)
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
consumer.unsubscribe()
client.close()
def test_client_reference_deleted(self):
def get_producer():
cl = Client(self.serviceUrl)
return cl.create_producer(topic="foobar")
producer = get_producer()
producer.send(b"test_payload")
#####
def test_get_topic_name(self):
client = Client(self.serviceUrl)
consumer = client.subscribe(
"persistent://public/default/topic_name_test", "topic_name_test_sub", consumer_type=ConsumerType.Shared
)
producer = client.create_producer("persistent://public/default/topic_name_test")
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertEqual(msg.topic_name(), "persistent://public/default/topic_name_test")
client.close()
def test_get_partitioned_topic_name(self):
client = Client(self.serviceUrl)
url1 = self.adminUrl + "/admin/v2/persistent/public/default/partitioned_topic_name_test/partitions"
doHttpPut(url1, "3")
partitions = [
"persistent://public/default/partitioned_topic_name_test-partition-0",
"persistent://public/default/partitioned_topic_name_test-partition-1",
"persistent://public/default/partitioned_topic_name_test-partition-2",
]
self.assertEqual(
client.get_topic_partitions("persistent://public/default/partitioned_topic_name_test"), partitions
)
consumer = client.subscribe(
"persistent://public/default/partitioned_topic_name_test",
"partitioned_topic_name_test_sub",
consumer_type=ConsumerType.Shared,
)
producer = client.create_producer("persistent://public/default/partitioned_topic_name_test")
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertTrue(msg.topic_name() in partitions)
client.close()
def test_shutdown_client(self):
client = Client(self.serviceUrl)
producer = client.create_producer("persistent://public/default/partitioned_topic_name_test")
producer.send(b"hello")
client.shutdown()
try:
producer.send(b"hello")
self.assertTrue(False)
except pulsar.PulsarException:
# Expected
pass
def test_listener_name_client(self):
client = Client(self.serviceUrl, listener_name='test')
try:
producer = client.create_producer("persistent://public/default/partitioned_topic_name_test")
self.fail()
except pulsar.PulsarException:
# Expected
pass
client.close()
def test_negative_acks(self):
client = Client(self.serviceUrl)
consumer = client.subscribe(
"test_negative_acks", "test", schema=pulsar.schema.StringSchema(), negative_ack_redelivery_delay_ms=1000
)
producer = client.create_producer("test_negative_acks", schema=pulsar.schema.StringSchema())
for i in range(10):
producer.send_async("hello-%d" % i, callback=None)
producer.flush()
for i in range(10):
msg = consumer.receive()
self.assertEqual(msg.value(), "hello-%d" % i)
consumer.negative_acknowledge(msg)
for i in range(10):
msg = consumer.receive()
self.assertEqual(msg.value(), "hello-%d" % i)
consumer.acknowledge(msg)
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
client.close()
def test_connect_timeout(self):
client = pulsar.Client(
service_url="pulsar://192.0.2.1:1234",
connection_timeout_ms=1000, # 1 second
)
t1 = time.time()
try:
producer = client.create_producer("test_connect_timeout")
self.fail("create_producer should not succeed")
except pulsar.ConnectError as expected:
print("expected error: {} when create producer".format(expected))
t2 = time.time()
self.assertGreater(t2 - t1, 1.0)
self.assertLess(t2 - t1, 1.5) # 1.5 seconds is long enough
client.close()
def test_json_schema_encode(self):
schema = JsonSchema(TestRecord)
record = TestRecord(a=1, b=2)
# Ensure that encoding a JsonSchema more than once works and produces the same result
first_encode = schema.encode(record)
second_encode = schema.encode(record)
self.assertEqual(first_encode, second_encode)
def test_configure_log_level(self):
client = pulsar.Client(
service_url="pulsar://localhost:6650",
logger=pulsar.ConsoleLogger(pulsar.LoggerLevel.Debug)
)
producer = client.create_producer(
topic='test_log_level'
)
producer.send(b'hello')
def test_configure_log_to_file(self):
client = pulsar.Client(
service_url="pulsar://localhost:6650",
logger=pulsar.FileLogger(pulsar.LoggerLevel.Debug, 'test.log')
)
producer = client.create_producer(
topic='test_log_to_file'
)
producer.send(b'hello')
def test_logger_thread_leaks(self):
def _do_connect(close):
logger = logging.getLogger(str(threading.current_thread().ident))
logger.setLevel(logging.INFO)
client = pulsar.Client(
service_url="pulsar://localhost:6650",
io_threads=4,
message_listener_threads=4,
operation_timeout_seconds=1,
log_conf_file_path=None,
authentication=None,
logger=logger,
)
client.get_topic_partitions("persistent://public/default/partitioned_topic_name_test")
if close:
client.close()
for should_close in (True, False):
self.assertEqual(threading.active_count(), 1, "Explicit close: {}; baseline is 1 thread".format(should_close))
_do_connect(should_close)
self.assertEqual(threading.active_count(), 1, "Explicit close: {}; synchronous connect doesn't leak threads".format(should_close))
threads = []
for _ in range(10):
threads.append(threading.Thread(target=_do_connect, args=(should_close)))
threads[-1].start()
for thread in threads:
thread.join()
assert threading.active_count() == 1, "Explicit close: {}; threaded connect in parallel doesn't leak threads".format(should_close)
def test_chunking(self):
client = Client(self.serviceUrl)
data_size = 10 * 1024 * 1024
producer = client.create_producer(
'test_chunking',
chunking_enabled=True
)
consumer = client.subscribe('test_chunking', "my-subscription",
max_pending_chunked_message=10,
auto_ack_oldest_chunked_message_on_queue_full=False
)
producer.send(bytes(bytearray(os.urandom(data_size))), None)
msg = consumer.receive(TM)
self.assertEqual(len(msg.data()), data_size)
def test_invalid_chunking_config(self):
client = Client(self.serviceUrl)
self._check_value_error(lambda: client.create_producer(
'test_invalid_chunking_config',
chunking_enabled=True,
batching_enabled=True
))
def _check_value_error(self, fun):
with self.assertRaises(ValueError):
fun()
def _check_type_error(self, fun):
with self.assertRaises(TypeError):
fun()
def _test_basic_auth(self, id, auth):
client = Client(self.adminUrl, authentication=auth)
topic = "persistent://private/auth/my-python-topic-basic-auth-" + str(id)
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
producer = client.create_producer(topic)
producer.send(b"hello")
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello")
client.close()
def test_basic_auth(self):
username = "admin"
password = "123456"
self._test_basic_auth(0, AuthenticationBasic(username, password))
self._test_basic_auth(1, AuthenticationBasic(
auth_params_string='{{"username": "{}","password": "{}"}}'.format(username, password)
))
def test_basic_auth_method(self):
username = "admin"
password = "123456"
self._test_basic_auth(2, AuthenticationBasic(username, password, 'basic'))
with self.assertRaises(pulsar.AuthorizationError):
self._test_basic_auth(3, AuthenticationBasic(username, password, 'unknown'))
self._test_basic_auth(4, AuthenticationBasic(
auth_params_string='{{"username": "{}","password": "{}", "method": "basic"}}'.format(username, password)
))
with self.assertRaises(pulsar.AuthorizationError):
self._test_basic_auth(5, AuthenticationBasic(
auth_params_string='{{"username": "{}","password": "{}", "method": "unknown"}}'.format(username, password)
))
def test_invalid_basic_auth(self):
username = "invalid"
password = "123456"
client = Client(self.adminUrl, authentication=AuthenticationBasic(username, password))
topic = "persistent://private/auth/my-python-topic-invalid-basic-auth"
with self.assertRaises(pulsar.ConnectError):
client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
client = Client(self.adminUrl, authentication=AuthenticationBasic(
auth_params_string='{{"username": "{}","password": "{}"}}'.format(username, password)
))
with self.assertRaises(pulsar.ConnectError):
client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
with self.assertRaises(RuntimeError):
AuthenticationBasic(auth_params_string='invalid auth params')
def test_send_async_no_deadlock(self):
client = Client(self.serviceUrl)
producer = client.create_producer('test_send_async_no_deadlock')
def send_callback(res, msg):
print(f"Message '{msg}' published res={res}")
for i in range(30):
producer.send_async(f"Hello-{i}".encode('utf-8'), callback=send_callback)
producer.flush()
client.close()
def test_keyshare_policy(self):
with self.assertRaises(ValueError):
# Raise error because sticky ranges are not provided.
pulsar.ConsumerKeySharedPolicy(
key_shared_mode=pulsar.KeySharedMode.Sticky,
allow_out_of_order_delivery=False,
)
expected_key_shared_mode = pulsar.KeySharedMode.Sticky
expected_allow_out_of_order_delivery = True
expected_sticky_ranges = [(0, 100), (101,200)]
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
key_shared_mode=expected_key_shared_mode,
allow_out_of_order_delivery=expected_allow_out_of_order_delivery,
sticky_ranges=expected_sticky_ranges
)
self.assertEqual(consumer_key_shared_policy.key_shared_mode, expected_key_shared_mode)
self.assertEqual(consumer_key_shared_policy.allow_out_of_order_delivery, expected_allow_out_of_order_delivery)
self.assertEqual(consumer_key_shared_policy.sticky_ranges, expected_sticky_ranges)
def test_keyshared_invalid_sticky_ranges(self):
client = Client(self.serviceUrl)
topic = "my-python-topic-keyshare-invalid-" + str(time.time())
with self.assertRaises(ValueError):
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
key_shared_mode=pulsar.KeySharedMode.Sticky,
allow_out_of_order_delivery=False,
sticky_ranges=[(0,65536)]
)
client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared,
start_message_id_inclusive=True,
key_shared_policy=consumer_key_shared_policy)
with self.assertRaises(ValueError):
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
key_shared_mode=pulsar.KeySharedMode.Sticky,
allow_out_of_order_delivery=False,
sticky_ranges=[(0, 100), (50, 150)]
)
client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared,
start_message_id_inclusive=True,
key_shared_policy=consumer_key_shared_policy)
def test_keyshared_autosplit(self):
client = Client(self.serviceUrl)
topic = "my-python-topic-keyshare-autosplit-" + str(time.time())
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
key_shared_mode=pulsar.KeySharedMode.AutoSplit,
allow_out_of_order_delivery=True,
)
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name = 'con-1',
start_message_id_inclusive=True, key_shared_policy=consumer_key_shared_policy)
consumer2 = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name = 'con-2',
start_message_id_inclusive=True, key_shared_policy=consumer_key_shared_policy)
producer = client.create_producer(topic)
for i in range(10):
if i > 0:
time.sleep(0.02)
producer.send(b"hello-%d" % i)
msgs = []
while True:
try:
msg = consumer.receive(100)
except pulsar.Timeout:
break
msgs.append(msg)
consumer.acknowledge(msg)
while True:
try:
msg = consumer2.receive(100)
except pulsar.Timeout:
break
msgs.append(msg)
consumer2.acknowledge(msg)
self.assertEqual(len(msgs), 10)
client.close()
def test_sticky_autosplit(self):
client = Client(self.serviceUrl)
topic = "my-python-topic-keyshare-sticky-" + str(time.time())
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
key_shared_mode=pulsar.KeySharedMode.Sticky,
allow_out_of_order_delivery=True,
sticky_ranges=[(0,30000)],
)
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name='con-1',
start_message_id_inclusive=True, key_shared_policy=consumer_key_shared_policy)
consumer2_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
key_shared_mode=pulsar.KeySharedMode.Sticky,
allow_out_of_order_delivery=True,
sticky_ranges=[(30001, 65535)],
)
consumer2 = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name='con-2',
start_message_id_inclusive=True, key_shared_policy=consumer2_key_shared_policy)
producer = client.create_producer(topic)
for i in range(10):
if i > 0:
time.sleep(0.02)
producer.send(b"hello-%d" % i)
msgs = []
while True:
try:
msg = consumer.receive(100)
except pulsar.Timeout:
break
msgs.append(msg)
consumer.acknowledge(msg)
while True:
try:
msg = consumer2.receive(100)
except pulsar.Timeout:
break
msgs.append(msg)
consumer2.acknowledge(msg)
self.assertEqual(len(msgs), 10)
client.close()
def test_acknowledge_failed(self):
client = Client(self.serviceUrl)
topic = 'test_acknowledge_failed'
producer = client.create_producer(topic)
consumer1 = client.subscribe(topic, 'sub1', consumer_type=ConsumerType.Shared)
consumer2 = client.subscribe(topic, 'sub2', consumer_type=ConsumerType.KeyShared)
msg_id = producer.send('hello'.encode())
msg1 = consumer1.receive()
with self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError):
consumer1.acknowledge_cumulative(msg1)
with self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError):
consumer1.acknowledge_cumulative(msg1.message_id())
msg2 = consumer2.receive()
with self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError):
consumer2.acknowledge_cumulative(msg2)
with self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError):
consumer2.acknowledge_cumulative(msg2.message_id())
consumer = client.subscribe([topic, topic + '-another'], 'sub')
# The message id does not have a topic name
with self.assertRaises(pulsar.OperationNotSupported):
consumer.acknowledge(msg_id)
client.close()
def test_batch_index_ack(self):
topic_name = 'test-batch-index-ack-3'
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer(topic_name,
batching_enabled=True,
batching_max_messages=100,
batching_max_publish_delay_ms=10000)
consumer = client.subscribe(topic_name,
subscription_name='test-batch-index-ack',
batch_index_ack_enabled=True)
# Make sure send 0~5 is a batch msg.
for i in range(5):
producer.send_async(b"hello-%d" % i, callback=None)
producer.flush()
# Receive msgs and just ack 0, 1 msgs
results = []
for i in range(5):
msg = consumer.receive()
print("receive from {}".format(msg.message_id()))
results.append(msg)
assert len(results) == 5
for i in range(2):
consumer.acknowledge(results[i])
time.sleep(0.2)
# Restart consumer after, just receive 2~5 msg.
consumer.close()
consumer = client.subscribe(topic_name,
subscription_name='test-batch-index-ack',
batch_index_ack_enabled=True)
results2 = []
for i in range(2, 5):
msg = consumer.receive()
results2.append(msg)
assert len(results2) == 3
# assert no more msgs.
with self.assertRaises(pulsar.Timeout):
consumer.receive(timeout_millis=1000)
client.close()
def test_dead_letter_policy_config(self):
with self.assertRaises(ValueError):
ConsumerDeadLetterPolicy(-1)
policy = ConsumerDeadLetterPolicy(10)
self.assertEqual(10, policy.max_redeliver_count)
self.assertEqual("", policy.dead_letter_topic)
self.assertEqual("", policy.initial_subscription_name)
def test_dead_letter_policy(self):
client = Client(self.serviceUrl)
topic = "my-python-topic-test-dlq" + str(time.time())
dlq_topic = 'dlq-' + topic
max_redeliver_count = 5
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared,
dead_letter_policy=ConsumerDeadLetterPolicy(max_redeliver_count, dlq_topic, 'init-sub'))
dlq_consumer = client.subscribe(dlq_topic, "my-sub", consumer_type=ConsumerType.Shared)
# Sen num msgs.
producer = client.create_producer(topic)
num = 10
for i in range(num):
producer.send(b"hello-%d" % i)
producer.flush()
# Redelivery all messages maxRedeliverCountNum time.
for i in range(1, num * max_redeliver_count + num + 1):
msg = consumer.receive()
if i % num == 0:
consumer.redeliver_unacknowledged_messages()
print(f"Start redeliver msgs '{i}'")
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
for i in range(num):
msg = dlq_consumer.receive()
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello-%d" % i)
dlq_consumer.acknowledge(msg)
with self.assertRaises(pulsar.Timeout):
dlq_consumer.receive(100)
client.close()
def test_regex_subscription(self):
client = Client(self.serviceUrl)
topic1 = "persistent://public/default/test-regex-sub-1"
topic2 = "persistent://public/default/test-regex-sub-2"
topic3 = "non-persistent://public/default/test-regex-sub-3"
topic4 = "persistent://public/default/no-match-test-regex-sub-3" # no match pattern rule topic.
producer1 = client.create_producer(topic1)
producer2 = client.create_producer(topic2)
producer3 = client.create_producer(topic3)
producer4 = client.create_producer(topic4)
consumer_all = client.subscribe(
re.compile('public/default/test-regex-sub-.*'), "regex-sub-all",
consumer_type=ConsumerType.Shared, regex_subscription_mode=RegexSubscriptionMode.AllTopics
)
consumer_persistent = client.subscribe(
re.compile('public/default/test-regex-sub-.*'), "regex-sub-persistent",
consumer_type=ConsumerType.Shared, regex_subscription_mode=RegexSubscriptionMode.PersistentOnly
)
consumer_non_persistent = client.subscribe(
re.compile('public/default/test-regex-sub-.*'), "regex-sub-non-persistent",
consumer_type=ConsumerType.Shared, regex_subscription_mode=RegexSubscriptionMode.NonPersistentOnly
)
num = 10
for i in range(num):
producer1.send(b"hello-1-%d" % i)
producer2.send(b"hello-2-%d" % i)
producer3.send(b"hello-3-%d" % i)
producer4.send(b"hello-4-%d" % i)
# Assert consumer_all.
received_topics = set()
for i in range(3 * num):
msg = consumer_all.receive(TM)
topic_name = msg.topic_name()
self.assertIn(topic_name, [topic1, topic2, topic3])
received_topics.add(topic_name)
consumer_all.acknowledge(msg)
self.assertEqual(received_topics, {topic1, topic2, topic3})
with self.assertRaises(pulsar.Timeout):
consumer_all.receive(100)
# Assert consumer_persistent.
received_topics.clear()
for i in range(2 * num):
msg = consumer_persistent.receive(TM)
topic_name = msg.topic_name()
self.assertIn(topic_name, [topic1, topic2])
received_topics.add(topic_name)
consumer_persistent.acknowledge(msg)
self.assertEqual(received_topics, {topic1, topic2})
with self.assertRaises(pulsar.Timeout):
consumer_persistent.receive(100)
# Assert consumer_non_persistent.
received_topics.clear()
for i in range(num):
msg = consumer_non_persistent.receive(TM)
topic_name = msg.topic_name()
self.assertIn(topic_name, [topic3])
received_topics.add(topic_name)
consumer_non_persistent.acknowledge(msg)
self.assertEqual(received_topics, {topic3})
with self.assertRaises(pulsar.Timeout):
consumer_non_persistent.receive(100)
client.close()
def test_consumer_name(self):
client = Client(self.serviceUrl)
name = 'my-consumer-name'
consumer = client.subscribe('test_consumer_name', 'sub', consumer_name=name)
self.assertEqual(consumer.consumer_name(), name)
client.close()
if __name__ == "__main__":
main()