blob: 90a2465d98ff085211e51ca2c436060156377afe [file] [log] [blame] [view]
# Pulsar Python client library
Pulsar Python client library is based on the existing C++ client library.
All the same features are exposed through Python interface.
## Install
First follow the instructions to compile the Pulsar C++ client library.
That will also build the Python binding for the library.
Currently the only supported Python version is 2.7.
To install the Python bindings:
```shell
$ cd pulsar-client-cpp/python
$ sudo python setup.py install
```
## Examples
#### Producer example
```python
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer(
'persistent://sample/standalone/ns/my-topic')
for i in range(10):
producer.send('Hello-%d' % i)
client.close()
```
##### Consumer Example
```python
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe(
'persistent://sample/standalone/ns/my-topic',
'my-sub')
while True:
msg = consumer.receive()
print("Received message '%s' id='%s'", msg.data(), msg.message_id())
consumer.acknowledge(msg)
client.close()
```
#### Async Producer example
```python
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer(
'persistent://sample/standalone/ns/my-topic',
block_if_queue_full=True,
batching_enabled=True,
batching_max_publish_delay_ms=10
)
def send_callback(res, msg):
print('Message published res=%s', res)
while True:
producer.send_async('Hello-%d' % i, send_callback)
client.close()
```
#### PyDoc API Reference
```
CLASSES
Authentication
Client
Consumer
Producer
class Authentication
| Authentication provider object
|
| Methods defined here:
|
| __init__(self, dynamicLibPath, authParamsString)
| Create the authentication provider instance
|
| -- Args --
| dynamicLibPath -- Path to the authentication provider shared library (eg: 'tls.so')
| authParamsString -- comma separated list of provider specific configuration params
class Client
| Pulsar client.
|
| A single client instance can be used to create producers and consumers on
| multiple topics.
|
| The client will share the same connection pool and threads across all the
| producers and consumers.
|
| Methods defined here:
|
| __init__(self, service_url, authentication=None, operation_timeout_seconds=30,
| io_threads=1, message_listener_threads=1, concurrent_lookup_requests=5000,
| log_conf_file_path=None, use_tls=False, tls_trust_certs_file_path=None,
| tls_allow_insecure_connection=False)
| Create a new Pulsar client instance
|
| -- Args --
| service_url -- The Pulsar service url eg: pulsar://my-broker.com:6650/
|
| -- Options --
| authentication -- Set the authentication provider to be used with the broker
| operation_timeout_seconds -- Set timeout on client operations (subscribe, create
| producer, close, unsubscribe)
| io_threads -- Set the number of IO threads to be used by the Pulsar client.
| message_listener_threads -- Set the number of threads to be used by the Pulsar client
| when delivering messages through message listener. Default
| is 1 thread per Pulsar client.
| If using more than 1 thread, messages for distinct
| message_listener will be delivered in different threads,
| however a single MessageListener will always be assigned to
| the same thread.
| concurrent_lookup_requests -- Number of concurrent lookup-requests allowed on each
| broker-connection to prevent overload on broker.
| log_conf_file_path -- Initialize log4cxx from a conf file
| use_tls -- configure whether to use TLS encryption on the connection
| tls_trust_certs_file_path -- Set the path to the trusted TLS certificate file
| tls_allow_insecure_connection -- Configure whether the Pulsar client accept untrusted TLS
| certificate from broker
|
| close(self)
| Close the client and all the associated producers and consumers
|
| create_producer(self, topic, send_timeout_millis=30000,
| compression_type=_pulsar.CompressionType.None, max_pending_messages=30000,
| block_if_queue_full=False, batching_enabled=False, batching_max_messages=1000,
| batching_max_allowed_size_in_bytes=131072, batching_max_publish_delay_ms=10)
| Create a new producer on a given topic
|
| -- Args --
| topic -- the topic name
|
| -- Options --
| send_timeout_seconds -- If a message is not acknowledged by the server before the
| send_timeout expires, an error will be reported
| compression_type -- Set the compression type for the producer. By default, message
| payloads are not compressed. Supported compression types are:
| CompressionType.LZ4 and CompressionType.ZLib
| max_pending_messages -- Set the max size of the queue holding the messages pending to
| receive an acknowledgment from the broker.
| block_if_queue_full -- Set whether send_async} operations should block when the outgoing
| message queue is full.
| message_routing_mode -- Set the message routing mode for the partitioned producer
|
| return a new Producer object
|
| subscribe(self, topic, subscription_name,
| consumer_type=_pulsar.ConsumerType.Exclusive, message_listener=None,
| receiver_queue_size=1000, consumer_name=None,
| unacked_messages_timeout_ms=None, broker_consumer_stats_cache_time_ms=30000)
| Subscribe to the given topic and subscription combination
|
| -- Args --
| topic -- The name of the topic
| subscription -- The name of the subscription
|
| -- Options --
| consumer_type -- Select the subscription type to be used when subscribing to the topic.
| message_listener -- Sets a message listener for the consumer. When the listener is set,
| application will receive messages through it. Calls to
| consumer.receive() will not be allowed.
| receiver_queue_size -- Sets the size of the consumer receive queue. The consumer receive
| ueue controls how many messages can be accumulated by the Consumer
| before the application calls receive(). Using a higher value could
| potentially increase the consumer throughput at the expense of bigger
| memory utilization.
|
| Setting the consumer queue size as zero decreases the throughput of
| the consumer, by disabling pre-fetching of messages. This approach
| improves the message distribution on shared subscription, by pushing
| messages only to the consumers that are ready to process them.
| Neither receive with timeout nor Partitioned Topics can be used if
| the consumer queue size is zero. The receive() function call should
| not be interrupted when the consumer queue size is zero. Default
| value is 1000 messages and should be good for most use cases.
| consumer_name -- Sets the consumer name
| unacked_messages_timeout_ms -- Set the timeout in milliseconds for unacknowledged messages,
| the timeout needs to be greater than 10 seconds. An
| Exception is thrown if the given value is less than 10 seconds
| If a successful acknowledgement is not sent within the
| timeout all the unacknowledged messages are redelivered.
| broker_consumer_stats_cache_time_ms -- Set the time duration for which the broker side
| consumer stats will be cached in the client.
|
| return a new Consumer object
class Consumer
| Pulsar consumer
|
| Methods defined here:
|
| acknowledge(self, message)
| Acknowledge the reception of a single message.
|
| This method will block until an acknowledgement is sent to the broker.
| After that, the message will not be re-delivered to this consumer.
|
| message -- The received message or message id
|
| acknowledge_cumulative(self, message)
| Acknowledge the reception of all the messages in the stream up to (and
| including) the provided message.
|
| This method will block until an acknowledgement is sent to the broker.
| After that, the messages will not be re-delivered to this consumer.
|
| message -- The received message or message id
|
| close(self)
| Close the consumer
|
| pause_message_listener(self)
| Pause receiving messages via the message_listener, until
| resume_message_listener() is called.
|
| receive(self, timeout_millis=None)
| Receive a single message.
|
| If a message is not immediately available, this method will block until
| a new message is available.
|
| timeout_millis -- If specified, the receive will raise an exception if
| a message is not availble withing the timeout
|
| redeliver_unacknowledged_messages(self)
| Redelivers all the unacknowledged messages. In Failover mode, the request
| is ignored if the consumer is not active for the given topic. In Shared
| mode, the consumers messages to be redelivered are distributed across all
| the connected consumers. This is a non blocking call and doesn't throw an
| exception. In case the connection breaks, the messages are redelivered
| after reconnect.
|
| resume_message_listener(self)
| Resume receiving the messages via the messageListener.
| Asynchronously receive all the messages enqueued from time
| pause_message_listener() was called.
|
| subscription_name(self)
| return the subscription name
|
| topic(self)
| return the topic this consumer is subscribed to
|
| unsubscribe(self)
| Unsubscribe the current consumer from the topic.
|
| This method will block until the operation is completed. Once the consumer
| is unsubscribed, no more messages will be received and subsequent new
| messages will not be retained for this consumer.
|
| This consumer object cannot be reused.
class Producer
| Producer object
|
| The producer is used to publish messages on a topic
|
| Methods defined here:
|
| close(self)
| Close the producer
|
| send(self, content, properties=None, partition_key=None, replication_clusters=None,
| disable_replication=False)
| Publish a message on the topic. Blocks until the message is acknowledged
|
| -- Args --
| content -- A string with the message payload
|
| -- Options --
| properties -- dict of application defined string properties
| partition_key -- set partition key for the message routing. Hash of this key is used to
| determine message's destination partition
| replication_clusters -- override namespace replication clusters. note that it is the
| caller's responsibility to provide valid cluster names, and that
| all clusters have been previously configured as destinations.
| Given an empty list, the message will replicate per the namespace
| configuration.
| disable_replication -- Do not replicate this message
|
| send_async(self, content, callback, properties=None, partition_key=None,
| replication_clusters=None, disable_replication=False)
| Send a message asynchronously
|
| The `callback` will be invoked once the message has been acknowledged
| by the broker.
|
| Example:
| def callback(res, msg):
| print 'Message published:', res
|
| producer.send_async(msg, callback)
|
| When the producer queue is full, by default the message will be rejected
| and the callback invoked with an error code.
|
| -- Args --
| content -- A string with the message payload
|
| -- Options --
| properties -- dict of application defined string properties
| partition_key -- set partition key for the message routing. Hash of this key is used to
| determine message's destination partition
| replication_clusters -- override namespace replication clusters. note that it is the
| caller's responsibility to provide valid cluster names, and that
| all clusters have been previously configured as destinations.
| Given an empty list, the message will replicate per the namespace
| configuration.
| disable_replication -- Do not replicate this message
```