blob: fc9a6f477a129d14dd1ec080cec0e36b29dfe58c [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from unittest import TestCase, main
import time
from pulsar import Client, MessageId, \
CompressionType, ConsumerType
from _pulsar import ProducerConfiguration, ConsumerConfiguration
try:
# For Python 3.0 and later
from urllib.request import urlopen, Request
except ImportError:
# Fall back to Python 2's urllib2
from urllib2 import urlopen, Request
def doHttpPost(url, data):
req = Request(url, data.encode())
req.add_header('Content-Type', 'application/json')
urlopen(req)
class PulsarTest(TestCase):
serviceUrl = 'pulsar://localhost:8885'
adminUrl = 'http://localhost:8765'
def test_producer_config(self):
conf = ProducerConfiguration()
conf.send_timeout_millis(12)
self.assertEqual(conf.send_timeout_millis(), 12)
self.assertEqual(conf.compression_type(), CompressionType.NONE)
conf.compression_type(CompressionType.LZ4)
self.assertEqual(conf.compression_type(), CompressionType.LZ4)
conf.max_pending_messages(120)
self.assertEqual(conf.max_pending_messages(), 120)
def test_consumer_config(self):
conf = ConsumerConfiguration()
self.assertEqual(conf.consumer_type(), ConsumerType.Exclusive)
conf.consumer_type(ConsumerType.Shared)
self.assertEqual(conf.consumer_type(), ConsumerType.Shared)
self.assertEqual(conf.consumer_name(), '')
conf.consumer_name("my-name")
self.assertEqual(conf.consumer_name(), "my-name")
def test_simple_producer(self):
client = Client(self.serviceUrl)
producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic')
producer.send('hello')
producer.close()
client.close()
def test_producer_send_async(self):
client = Client(self.serviceUrl)
producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic')
sent_messages = []
def send_callback(producer, msg):
sent_messages.append(msg)
producer.send_async('hello', send_callback)
producer.send_async('hello', send_callback)
producer.send_async('hello', send_callback)
time.sleep(0.1)
self.assertEqual(len(sent_messages), 3)
client.close()
def test_producer_consumer(self):
client = Client(self.serviceUrl)
consumer = client.subscribe('persistent://sample/standalone/ns/my-python-topic-producer-consumer',
'my-sub',
consumer_type=ConsumerType.Shared)
producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic-producer-consumer')
producer.send('hello')
msg = consumer.receive(1000)
self.assertTrue(msg)
self.assertEqual(msg.data(), 'hello')
try:
msg = consumer.receive(100)
self.assertTrue(False) # Should not reach this point
except:
pass # Exception is expected
client.close()
def test_message_listener(self):
client = Client(self.serviceUrl)
received_messages = []
def listener(consumer, msg):
print("Got message: %s" % msg)
received_messages.append(msg)
consumer.acknowledge(msg)
client.subscribe('persistent://sample/standalone/ns/my-python-topic-listener',
'my-sub',
consumer_type=ConsumerType.Exclusive,
message_listener=listener)
producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic-listener')
producer.send('hello-1')
producer.send('hello-2')
producer.send('hello-3')
time.sleep(0.1)
self.assertEqual(len(received_messages), 3)
self.assertEqual(received_messages[0].data(), "hello-1")
self.assertEqual(received_messages[1].data(), "hello-2")
self.assertEqual(received_messages[2].data(), "hello-3")
client.close()
def test_reader_simple(self):
client = Client(self.serviceUrl)
reader = client.create_reader('persistent://sample/standalone/ns/my-python-topic-reader-simple',
MessageId.earliest)
producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic-reader-simple')
producer.send('hello')
msg = reader.read_next()
self.assertTrue(msg)
self.assertEqual(msg.data(), 'hello')
try:
msg = reader.read_next(100)
self.assertTrue(False) # Should not reach this point
except:
pass # Exception is expected
reader.close()
client.close()
def test_reader_on_last_message(self):
client = Client(self.serviceUrl)
producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic-reader-on-last-message')
for i in range(10):
producer.send('hello-%d' % i)
reader = client.create_reader('persistent://sample/standalone/ns/my-python-topic-reader-on-last-message',
MessageId.latest)
for i in range(10, 20):
producer.send('hello-%d' % i)
for i in range(10, 20):
msg = reader.read_next()
self.assertTrue(msg)
self.assertEqual(msg.data(), 'hello-%d' % i)
reader.close()
client.close()
def test_reader_on_specific_message(self):
client = Client(self.serviceUrl)
producer = client.create_producer(
'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message')
for i in range(10):
producer.send('hello-%d' % i)
reader1 = client.create_reader(
'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message',
MessageId.earliest)
for i in range(5):
msg = reader1.read_next()
last_msg_id = msg.message_id()
reader2 = client.create_reader(
'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message',
last_msg_id)
for i in range(5, 10):
msg = reader2.read_next()
self.assertTrue(msg)
self.assertEqual(msg.data(), 'hello-%d' % i)
reader1.close()
reader2.close()
client.close()
def test_reader_on_specific_message_with_batches(self):
client = Client(self.serviceUrl)
producer = client.create_producer(
'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message-with-batches',
batching_enabled=True,
batching_max_publish_delay_ms=1000)
for i in range(10):
producer.send_async('hello-%d' % i, None)
# Send one sync message to make sure everything was published
producer.send('hello-10')
reader1 = client.create_reader(
'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message-with-batches',
MessageId.earliest)
for i in range(5):
msg = reader1.read_next()
last_msg_id = msg.message_id()
reader2 = client.create_reader(
'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message-with-batches',
last_msg_id)
for i in range(5, 11):
msg = reader2.read_next()
self.assertTrue(msg)
self.assertEqual(msg.data(), 'hello-%d' % i)
reader1.close()
reader2.close()
client.close()
def test_producer_sequence_after_reconnection(self):
# Enable deduplication on namespace
doHttpPost(self.adminUrl + '/admin/namespaces/sample/standalone/ns1/deduplication',
'true')
client = Client(self.serviceUrl)
topic = 'persistent://sample/standalone/ns1/my-python-test-producer-sequence-after-reconnection-' + str(time.time())
producer = client.create_producer(topic, producer_name='my-producer-name')
self.assertEqual(producer.last_sequence_id(), -1)
for i in range(10):
producer.send('hello-%d' % i)
self.assertEqual(producer.last_sequence_id(), i)
producer.close()
producer = client.create_producer(topic, producer_name='my-producer-name')
self.assertEqual(producer.last_sequence_id(), 9)
for i in range(10, 20):
producer.send('hello-%d' % i)
self.assertEqual(producer.last_sequence_id(), i)
def test_producer_deduplication(self):
# Enable deduplication on namespace
doHttpPost(self.adminUrl + '/admin/namespaces/sample/standalone/ns1/deduplication',
'true')
client = Client(self.serviceUrl)
topic = 'persistent://sample/standalone/ns1/my-python-test-producer-deduplication-' + str(time.time())
producer = client.create_producer(topic, producer_name='my-producer-name')
self.assertEqual(producer.last_sequence_id(), -1)
consumer = client.subscribe(topic, 'my-sub')
producer.send('hello-0', sequence_id=0)
producer.send('hello-1', sequence_id=1)
producer.send('hello-2', sequence_id=2)
self.assertEqual(producer.last_sequence_id(), 2)
# Repeat the messages and verify they're not received by consumer
producer.send('hello-1', sequence_id=1)
producer.send('hello-2', sequence_id=2)
self.assertEqual(producer.last_sequence_id(), 2)
for i in range(3):
msg = consumer.receive()
self.assertEqual(msg.data(), 'hello-%d' % i)
consumer.acknowledge(msg)
try:
# No other messages should be received
consumer.receive(timeout_millis=1000)
self.assertTrue(False)
except:
# Exception is expected
pass
producer.close()
producer = client.create_producer(topic, producer_name='my-producer-name')
self.assertEqual(producer.last_sequence_id(), 2)
# Repeat the messages and verify they're not received by consumer
producer.send('hello-1', sequence_id=1)
producer.send('hello-2', sequence_id=2)
self.assertEqual(producer.last_sequence_id(), 2)
try:
# No other messages should be received
consumer.receive(timeout_millis=1000)
self.assertTrue(False)
except:
# Exception is expected
pass
if __name__ == '__main__':
main()